[release] Merge branch 'v3.2.0'

This commit is contained in:
tjq 2020-07-19 23:54:15 +08:00
commit 97b00a7c33
61 changed files with 509 additions and 141 deletions

View File

@ -68,7 +68,7 @@ PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英
* [广播执行](https://yq.aliyun.com/articles/716203?spm=a2c4e.11153959.teamhomeleft.40.371960c9qhB1mB)运行清理日志脚本什么的也太实用了8
# 其他
* 产品永久开源Apache License, Version 2.0),免费使用,且目前开发者@KFCFans有充足的时间进行项目维护和提供无偿技术支持All In 了解一下),欢迎各位试用!
* 开源许可证Apache License, Version 2.0
* 欢迎共同参与本项目的贡献PR和Issue都大大滴欢迎求求了
* 觉得还不错的话可以点个Star支持一下哦 = ̄ω ̄=
* 联系方式@KFCFans -> `tengjiqi@gmail.com`

View File

@ -9,7 +9,7 @@
<version>1.0.0</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>https://github.com/KFCFans/OhMyScheduler</url>
<url>https://github.com/KFCFans/PowerJob</url>
<description>Distributed scheduling and execution framework</description>
<licenses>
<license>
@ -19,8 +19,8 @@
</license>
</licenses>
<scm>
<url>https://github.com/KFCFans/OhMyScheduler</url>
<connection>https://github.com/KFCFans/OhMyScheduler.git</connection>
<url>https://github.com/KFCFans/PowerJob</url>
<connection>https://github.com/KFCFans/PowerJob.git</connection>
</scm>
<developers>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.1.3</powerjob.common.version>
<powerjob.common.version>3.2.0</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>jar</packaging>
<properties>

View File

@ -28,6 +28,8 @@ public enum InstanceStatus {
// 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
// 结束状态
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) {

View File

@ -17,6 +17,7 @@ public class RemoteConstant {
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
public static final String WORKER_ACTOR_NAME = "worker";
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
@ -26,6 +27,7 @@ public class RemoteConstant {
public static final String SERVER_ACTOR_NAME = "server_actor";
public static final String SERVER_FRIEND_ACTOR_NAME = "friend_actor";
public static final String SERVER_TROUBLESHOOTING_ACTOR_NAME = "server_troubleshooting_actor";
public static final String SERVER_AKKA_CONFIG_NAME = "oms-server.akka.conf";

View File

@ -7,7 +7,7 @@ import lombok.NoArgsConstructor;
import java.util.List;
/**
* 任务实例的运行详细信息对外
* 任务实例的运行详细信息
*
* @author tjq
* @since 2020/4/11
@ -20,7 +20,7 @@ public class InstanceDetail implements OmsSerializable {
private Long actualTriggerTime;
// 任务整体结束时间可能不存在
private Long finishedTime;
// 任务状态中文
// 任务状态
private Integer status;
// 任务执行结果可能不存在
private String result;
@ -35,13 +35,16 @@ public class InstanceDetail implements OmsSerializable {
// 重试次数
private Long runningTimes;
// 扩展字段中间件升级不易最好不要再改 common 包了...否则 server worker 版本不兼容
private String extra;
// 秒级任务的 extra -> List<SubInstanceDetail>
@Data
@NoArgsConstructor
public static class SubInstanceDetail implements OmsSerializable {
private long subInstanceId;
private String startTime;
private String finishedTime;
private Long startTime;
private Long finishedTime;
private String result;
private int status;
}

View File

@ -27,6 +27,10 @@ public class WorkerHeartbeat implements OmsSerializable {
private long heartbeatTime;
// 当前加载的容器容器名称 -> 容器版本
private List<DeployedContainerInfo> containerInfos;
// worker 版本信息
private String version;
// 扩展字段
private String extra;
private SystemMetrics systemMetrics;
}

View File

@ -46,8 +46,8 @@ public class SaveJobInfoRequest {
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
private Integer maxInstanceNum = 1;
// 最大同时运行任务数0 代表不限
private Integer maxInstanceNum = 0;
// 并发度同时执行的线程数量
private Integer concurrency = 5;
// 任务整体超时时间

View File

@ -1,11 +1,12 @@
package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.OmsException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
@ -129,4 +130,20 @@ public class CommonUtils {
return obj;
}
/**
* 格式化时间将时间戳转化为可阅读时间
* @param ts 时间戳
* @return 可阅读时间
*/
public static String formatTime(Long ts) {
if (ts == null || ts <= 0) {
return OmsConstant.NONE;
}
try {
return DateFormatUtils.format(ts, OmsConstant.TIME_PATTERN);
}catch (Exception ignore) {
}
return OmsConstant.NONE;
}
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<powerjob.common.version>3.2.0</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -1,12 +1,12 @@
package com.github.kfcfans.powerjob.server.akka;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.*;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerTroubleshootingActor;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
@ -58,9 +58,15 @@ public class OhMyServer {
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ServerActor.class)
.withDispatcher("akka.server-actor-dispatcher")
.withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME);
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(ServerTroubleshootingActor.class), RemoteConstant.SERVER_TROUBLESHOOTING_ACTOR_NAME);
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
}

View File

@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor {
getInstanceManager().updateStatus(req);
// 结束状态成功/失败需要回复消息
if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
getSender().tell(AskResponse.succeed(null), getSelf());
}
}catch (Exception e) {

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.powerjob.server.akka.actors;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* 处理 server 异常信息的 actor
*
* @author tjq
* @since 2020/7/18
*/
@Slf4j
public class ServerTroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[ServerTroubleshootingActor] receive DeadLetter: {}", dl);
}
}

View File

@ -57,7 +57,11 @@ public class CoreJpaConfig {
HibernateProperties hibernateProperties = new HibernateProperties();
hibernateProperties.setDdlAuto("update");
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
// 配置JPA自定义表名称策略
hibernateProperties.getNaming().setPhysicalStrategy(PowerJobPhysicalNamingStrategy.class.getName());
HibernateSettings hibernateSettings = new HibernateSettings();
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), hibernateSettings);
}
@Primary
@ -78,5 +82,4 @@ public class CoreJpaConfig {
public PlatformTransactionManager initCoreTransactionManager(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(Objects.requireNonNull(initCoreEntityManagerFactory(builder).getObject()));
}
}

View File

@ -0,0 +1,46 @@
package com.github.kfcfans.powerjob.server.persistence.config;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import org.hibernate.boot.model.naming.Identifier;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy;
import org.springframework.util.StringUtils;
import java.io.Serializable;
/**
* 自定义表前缀配置项 oms.table-prefix 不配置时不增加表前缀
* 参考实现{@link org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy}
* <p>
* 1. 继承 PhysicalNamingStrategy 实现自定义表前缀
* </p>
* <p>
* 2. 修改@Query(nativeQuery = true)和其SQL用对象名和属性名代替表名和数据库字段名
* </p>
*
* @author songyinyin
* @since 2020/7/18
*/
public class PowerJobPhysicalNamingStrategy extends SpringPhysicalNamingStrategy implements Serializable {
/**
* 映射物理表名称把实体表 AppInfoDO DO 去掉再加上表前缀
*
* @param name 实体名称
* @param jdbcEnvironment jdbc环境变量
* @return 映射后的物理表
*/
@Override
public Identifier toPhysicalTableName(Identifier name, JdbcEnvironment jdbcEnvironment) {
String tablePrefix = PropertyUtils.getProperties().getProperty("oms.table-prefix");
String text = name.getText();
String noDOText = StringUtils.endsWithIgnoreCase(text, "do") ? text.substring(0, text.length() - 2) : text;
String newText = StringUtils.hasLength(tablePrefix) ? tablePrefix + noDOText : noDOText;
return super.toPhysicalTableName(new Identifier(newText, name.isQuoted()), jdbcEnvironment);
}
}

View File

@ -13,7 +13,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(name = "app_info", uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})})
@Table(uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})})
public class AppInfoDO {
@Id

View File

@ -13,7 +13,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(name = "container_info", indexes = {@Index(columnList = "appId")})
@Table(indexes = {@Index(columnList = "appId")})
public class ContainerInfoDO {
@Id

View File

@ -18,7 +18,7 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "instance_info", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")})
@Table(indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")})
public class InstanceInfoDO {
@Id

View File

@ -18,7 +18,7 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "job_info", indexes = {@Index(columnList = "appId")})
@Table(indexes = {@Index(columnList = "appId")})
public class JobInfoDO {

View File

@ -15,7 +15,7 @@ import java.util.Date;
@Data
@Entity
@NoArgsConstructor
@Table(name = "oms_lock", uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
@Table(uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
public class OmsLockDO {
@Id

View File

@ -15,7 +15,7 @@ import java.util.Date;
@Data
@Entity
@NoArgsConstructor
@Table(name = "server_info", uniqueConstraints = {@UniqueConstraint(columnNames = "ip")})
@Table(uniqueConstraints = {@UniqueConstraint(columnNames = "ip")})
public class ServerInfoDO {
@Id

View File

@ -13,7 +13,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(name = "user_info")
@Table
public class UserInfoDO {
@Id

View File

@ -17,7 +17,7 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "workflow_info", indexes = {@Index(columnList = "appId")})
@Table(indexes = {@Index(columnList = "appId")})
public class WorkflowInfoDO {
@Id

View File

@ -17,7 +17,7 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "workflow_instance_info")
@Table
public class WorkflowInstanceInfoDO {
@Id

View File

@ -37,19 +37,19 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
@Transactional
@Modifying
@CanIgnoreReturnValue
@Query(value = "update instance_info set status = ?2, running_times = ?3, actual_trigger_time = ?4, finished_time = ?5, task_tracker_address = ?6, result = ?7, instance_params = ?8, gmt_modified = ?9 where instance_id = ?1", nativeQuery = true)
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, actualTriggerTime = ?4, finishedTime = ?5, taskTrackerAddress = ?6, result = ?7, instanceParams = ?8, gmtModified = ?9 where instanceId = ?1")
int update4TriggerFailed(long instanceId, int status, long runningTimes, long actualTriggerTime, long finishedTime, String taskTrackerAddress, String result, String instanceParams, Date modifyTime);
@Transactional
@Modifying
@CanIgnoreReturnValue
@Query(value = "update instance_info set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, instance_params = ?6, gmt_modified = ?7 where instance_id = ?1", nativeQuery = true)
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, actualTriggerTime = ?4, taskTrackerAddress = ?5, instanceParams = ?6, gmtModified = ?7 where instanceId = ?1")
int update4TriggerSucceed(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress, String instanceParams, Date modifyTime);
@Modifying
@Transactional
@CanIgnoreReturnValue
@Query(value = "update instance_info set status = ?2, running_times = ?3, gmt_modified = ?4 where instance_id = ?1", nativeQuery = true)
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, gmtModified = ?4 where instanceId = ?1")
int update4FrequentJob(long instanceId, int status, long runningTimes, Date modifyTime);
// 状态检查三兄弟对应 WAITING_DISPATCH WAITING_WORKER_RECEIVE RUNNING 三阶段
@ -64,13 +64,13 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
long countByAppIdAndStatus(long appId, int status);
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);
@Query(value = "select job_id from instance_info where job_id in ?1 and status in ?2", nativeQuery = true)
@Query(value = "select distinct jobId from InstanceInfoDO where jobId in ?1 and status in ?2")
List<Long> findByJobIdInAndStatusIn(List<Long> jobIds, List<Integer> status);
// 删除历史数据JPA自带的删除居然是根据ID循环删2000条数据删了几秒也太拉垮了吧...
// 结果只能用 int 接收
@Modifying
@Transactional
@Query(value = "delete from instance_info where gmt_modified < ?1", nativeQuery = true)
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1")
int deleteAllByGmtModifiedBefore(Date time);
}

View File

@ -20,7 +20,7 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
// 调度专用
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
@Query(value = "select id from job_info where app_id in ?1 and status = ?2 and time_expression_type in ?3", nativeQuery = true)
@Query(value = "select id from JobInfoDO where appId in ?1 and status = ?2 and timeExpressionType in ?3")
List<Long> findByAppIdInAndStatusAndTimeExpressionTypeIn(List<Long> appIds, int status, List<Integer> timeTypes);
Page<JobInfoDO> findByAppIdAndStatusNot(Long appId, int status, Pageable pageable);

View File

@ -17,7 +17,7 @@ public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
@Query(value = "delete from OmsLockDO where lockName = ?1")
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);

View File

@ -24,7 +24,7 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowIn
// 结果只能用 int 接收
@Modifying
@Transactional
@Query(value = "delete from workflow_instance_info where gmt_modified < ?1", nativeQuery = true)
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1")
int deleteAllByGmtModifiedBefore(Date time);
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);

View File

@ -68,16 +68,20 @@ public class DispatchService {
// 查询当前运行的实例数
long current = System.currentTimeMillis();
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
// 0 代表不限制在线任务还能省去一次 DB 查询
if (jobInfo.getMaxInstanceNum() > 0) {
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return;
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return;
}
}
// 获取当前所有可用的Worker

View File

@ -194,7 +194,7 @@ public class InstanceService {
}
}catch (Exception e) {
log.error("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString());
log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString());
}
// 失败则返回基础版信息

View File

@ -35,17 +35,14 @@ public class CleanService {
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Value("${oms.log.retention.local}")
private int localLogRetentionDay;
@Value("${oms.log.retention.remote}")
private int remoteLogRetentionDay;
@Value("${oms.instanceinfo.retention}")
private int instanceInfoRetentionDay;
@Value("${oms.container.retention.local}")
private int localContainerRetentionDay;
@Value("${oms.container.retention.remote}")
private int remoteContainerRetentionDay;
@Value("${oms.instanceinfo.retention}")
private int instanceInfoRetentionDay;
private static final int TEMPORARY_RETENTION_DAY = 3;
@ -65,12 +62,12 @@ public class CleanService {
cleanWorkflowInstanceLog();
// 释放磁盘空间
cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay);
cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay);
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay);
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
}

View File

@ -48,12 +48,16 @@ public class WorkflowInstanceService {
// 停止所有已启动且未完成的服务
PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> {
if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER);
try {
if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceService.stopInstance(node.getInstanceId());
instanceService.stopInstance(node.getInstanceId());
}
}catch (Exception e) {
log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSONObject.toJSONString(node), e);
}
});

View File

@ -8,9 +8,11 @@ import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -86,6 +88,11 @@ public class WebLogAspect {
if (obj instanceof HttpServletRequest || obj instanceof HttpServletResponse) {
break;
}
// FatJar
if (obj instanceof MultipartFile || obj instanceof Resource) {
break;
}
objList.add(obj);
}
return JSONObject.toJSONString(objList);

View File

@ -1,7 +1,6 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
@ -15,6 +14,7 @@ import com.github.kfcfans.powerjob.server.service.CacheService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest;
import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO;
import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
@ -65,8 +65,8 @@ public class InstanceController {
}
@GetMapping("/detail")
public ResultDTO<InstanceDetail> getInstanceDetail(String instanceId) {
return ResultDTO.success(instanceService.getInstanceDetail(Long.valueOf(instanceId)));
public ResultDTO<InstanceDetailVO> getInstanceDetail(String instanceId) {
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(Long.valueOf(instanceId))));
}
@GetMapping("/log")

View File

@ -1,11 +1,7 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.PageResult;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
@ -15,7 +11,6 @@ import com.github.kfcfans.powerjob.server.web.request.QueryJobInfoRequest;
import com.github.kfcfans.powerjob.server.web.response.JobInfoVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
@ -90,7 +85,7 @@ public class JobController {
if (jobInfoOpt.isPresent()) {
result.setTotalItems(1);
result.setTotalPages(1);
result.setData(Lists.newArrayList(convert(jobInfoOpt.get())));
result.setData(Lists.newArrayList(JobInfoVO.from(jobInfoOpt.get())));
}else {
result.setTotalPages(0);
result.setTotalItems(0);
@ -108,34 +103,11 @@ public class JobController {
private static PageResult<JobInfoVO> convertPage(Page<JobInfoDO> jobInfoPage) {
List<JobInfoVO> jobInfoVOList = jobInfoPage.getContent().stream().map(JobController::convert).collect(Collectors.toList());
List<JobInfoVO> jobInfoVOList = jobInfoPage.getContent().stream().map(JobInfoVO::from).collect(Collectors.toList());
PageResult<JobInfoVO> pageResult = new PageResult<>(jobInfoPage);
pageResult.setData(jobInfoVOList);
return pageResult;
}
private static JobInfoVO convert(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO();
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType());
ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType());
jobInfoVO.setTimeExpressionType(timeExpressionType.name());
jobInfoVO.setExecuteType(executeType.name());
jobInfoVO.setProcessorType(processorType.name());
jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV());
if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) {
jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));
}else {
jobInfoVO.setNotifyUserIds(Lists.newLinkedList());
}
return jobInfoVO;
}
}

View File

@ -0,0 +1,98 @@
package com.github.kfcfans.powerjob.server.web.response;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 任务实例的运行详细信息对外展示对象
* 注意日期的格式化全部需要在 server 完成不能在浏览器完成否则会有时区问题 server browser 时区不一致时显示会有问题
*
* @author tjq
* @since 2020/7/18
*/
@Data
@NoArgsConstructor
public class InstanceDetailVO {
// 任务整体开始时间
private String actualTriggerTime;
// 任务整体结束时间可能不存在
private String finishedTime;
// 任务状态
private Integer status;
// 任务执行结果可能不存在
private String result;
// TaskTracker地址
private String taskTrackerAddress;
// MR或BD任务专用
private InstanceDetailVO.TaskDetail taskDetail;
// 秒级任务专用
private List<InstanceDetailVO.SubInstanceDetail> subInstanceDetails;
// 重试次数
private Long runningTimes;
// 秒级任务的 extra -> List<SubInstanceDetail>
@Data
@NoArgsConstructor
public static class SubInstanceDetail implements OmsSerializable {
private long subInstanceId;
private String startTime;
private String finishedTime;
private String result;
private int status;
}
// MapReduce Broadcast 任务的 extra ->
@Data
@NoArgsConstructor
public static class TaskDetail implements OmsSerializable {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
}
public static InstanceDetailVO from(InstanceDetail origin) {
InstanceDetailVO vo = new InstanceDetailVO();
BeanUtils.copyProperties(origin, vo);
// 格式化时间
vo.setFinishedTime(CommonUtils.formatTime(origin.getFinishedTime()));
vo.setActualTriggerTime(CommonUtils.formatTime(origin.getActualTriggerTime()));
// 拷贝 TaskDetail
if (origin.getTaskDetail() != null) {
TaskDetail voDetail = new TaskDetail();
BeanUtils.copyProperties(origin.getTaskDetail(), voDetail);
vo.setTaskDetail(voDetail);
}
// 拷贝秒级任务数据
if (!CollectionUtils.isEmpty(origin.getSubInstanceDetails())) {
vo.subInstanceDetails = Lists.newLinkedList();
origin.getSubInstanceDetails().forEach(subDetail -> {
SubInstanceDetail voSubDetail = new SubInstanceDetail();
BeanUtils.copyProperties(subDetail, voSubDetail);
// 格式化时间
voSubDetail.setStartTime(CommonUtils.formatTime(subDetail.getStartTime()));
voSubDetail.setFinishedTime(CommonUtils.formatTime(subDetail.getFinishedTime()));
vo.subInstanceDetails.add(voSubDetail);
});
}
return vo;
}
}

View File

@ -1,6 +1,16 @@
package com.github.kfcfans.powerjob.server.web.response;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.google.common.collect.Lists;
import lombok.Data;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import java.util.Date;
import java.util.List;
@ -56,6 +66,8 @@ public class JobInfoVO {
private boolean enable;
// 下一次调度时间
private Long nextTriggerTime;
// 下一次调度时间文字版
private String nextTriggerTimeStr;
/* ************************** 繁忙机器配置 ************************** */
// 最低CPU核心数量0代表不限
@ -76,4 +88,27 @@ public class JobInfoVO {
// 报警用户ID列表
private List<String> notifyUserIds;
public static JobInfoVO from(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO();
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType());
ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType());
jobInfoVO.setTimeExpressionType(timeExpressionType.name());
jobInfoVO.setExecuteType(executeType.name());
jobInfoVO.setProcessorType(processorType.name());
jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV());
if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) {
jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));
}else {
jobInfoVO.setNotifyUserIds(Lists.newLinkedList());
}
jobInfoVO.setNextTriggerTimeStr(CommonUtils.formatTime(jobInfoDO.getNextTriggerTime()));
return jobInfoVO;
}
}

View File

@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.log.retention.local=1
oms.log.retention.remote=1
oms.instanceinfo.retention=1
oms.container.retention.local=1
oms.container.retention.remote=-1
oms.instanceinfo.retention=1
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

View File

@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.log.retention.local=3
oms.log.retention.remote=3
oms.instanceinfo.retention=3
oms.container.retention.local=3
oms.container.retention.remote=-1
oms.instanceinfo.retention=3
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

View File

@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.log.retention.local=7
oms.log.retention.remote=7
oms.instanceinfo.retention=7
oms.container.retention.local=7
oms.container.retention.remote=-1
oms.instanceinfo.retention=3
####### 缓存配置 #######
oms.instance.metadata.cache.size=2048

View File

@ -11,8 +11,10 @@ spring.servlet.multipart.file-size-threshold=0
spring.servlet.multipart.max-file-size=209715200
spring.servlet.multipart.max-request-size=209715200
###### OhMyScheduler 自身配置(该配置只允许存在于 application.properties 文件中) ######
###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ######
# akka ActorSystem 服务端口
oms.akka.port=10086
# 报警服务 bean名称
oms.alarm.bean.names=omsDefaultMailAlarmService
oms.alarm.bean.names=omsDefaultMailAlarmService
# 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_
oms.table-prefix=

View File

@ -20,4 +20,24 @@ akka {
canonical.port = 0
}
}
server-actor-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -9,11 +9,13 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
@ -36,11 +38,14 @@ public class RepositoryTest {
private OmsLockRepository omsLockRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
/**
* 需要证明批量写入失败后会回滚
*/
@Test
@Transactional
public void testBatchLock() {
List<OmsLockDO> locks = Lists.newArrayList();
@ -52,6 +57,14 @@ public class RepositoryTest {
omsLockRepository.flush();
}
@Test
public void testDeleteLock() {
String lockName = "test-lock";
OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost(), 10000L);
omsLockRepository.save(lockDO);
omsLockRepository.deleteByLockName(lockName);
}
@Test
public void testSelectCronJobSQL() {
List<JobInfoDO> result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis());
@ -59,6 +72,7 @@ public class RepositoryTest {
}
@Test
@Transactional
public void testUpdate() {
InstanceInfoDO updateEntity = new InstanceInfoDO();
updateEntity.setId(22L);
@ -68,6 +82,7 @@ public class RepositoryTest {
}
@Test
@Transactional
public void testExecuteLogUpdate() {
instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", "", new Date());
instanceInfoRepository.update4FrequentJob(1586310419650L, 2, 200, new Date());
@ -81,4 +96,20 @@ public class RepositoryTest {
System.out.println(res);
}
@Test
public void testFindByJobIdInAndStatusIn() {
List<Long> res = instanceInfoRepository.findByJobIdInAndStatusIn(Lists.newArrayList(1L, 2L, 3L, 4L), Lists.newArrayList(1, 2, 3, 4, 5));
System.out.println(res);
}
@Test
public void testDeleteInstanceInfo() {
instanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
}
@Test
public void testDeleteWorkflowInstanceInfo() {
workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
}
}

View File

@ -33,4 +33,6 @@ oms.log.retention.local=0
oms.log.retention.remote=0
oms.container.retention.local=0
oms.container.retention.remote=0
oms.instanceinfo.retention=0;
oms.instanceinfo.retention=0;
# 表前缀
#oms.table-prefix=pj_

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<powerjob.worker.version>3.2.0</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<powerjob.worker.version>3.2.0</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<powerjob.common.version>3.2.0</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -1,7 +1,10 @@
package com.github.kfcfans.powerjob.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
@ -93,10 +97,21 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
int cores = Runtime.getRuntime().availableProcessors();
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(TaskTrackerActor.class)
.withDispatcher("akka.task-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)
.withDispatcher("akka.processor-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(WorkerActor.class)
.withDispatcher("akka.worker-common-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME);
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* 处理系统异常的 Actor
*
* @author 朱八
* @since 2020/7/16
*/
@Slf4j
public class TroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl);
}
}

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OmsWorkerVersion;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
@ -39,6 +40,7 @@ public class WorkerHealthReporter implements Runnable {
heartbeat.setAppName(OhMyWorker.getConfig().getAppName());
heartbeat.setAppId(OhMyWorker.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis());
heartbeat.setVersion(OmsWorkerVersion.getVersion());
// 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());

View File

@ -37,7 +37,7 @@ public class ProcessorBeanFactory {
return (BasicProcessor) clz.getDeclaredConstructor().newInstance();
}catch (Exception e) {
log.error("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e);
log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed, reason is {}", className, e.getMessage());
}
return null;
});

View File

@ -13,7 +13,7 @@ public class PythonProcessor extends ScriptProcessor {
}
@Override
protected String genScriptName(Long instanceId) {
protected String genScriptName() {
return String.format("python_%d.py", instanceId);
}

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class ScriptProcessor implements BasicProcessor {
private final Long instanceId;
protected final Long instanceId;
// 脚本绝对路径
private final String scriptPath;
private final long timeout;
@ -33,7 +34,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception {
this.instanceId = instanceId;
this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId);
this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName();
this.timeout = timeout;
File script = new File(scriptPath);
@ -66,6 +67,10 @@ public abstract class ScriptProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("SYSTEM===> ScriptProcessor start to process");
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
@ -80,40 +85,45 @@ public abstract class ScriptProcessor implements BasicProcessor {
// 为了代码优雅而牺牲那么一点点点点点点点点性能
// 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题想来想去还是直接创建算了
new Thread(() -> copyStream(process.getInputStream(), inputSB)).start();
new Thread(() -> copyStream(process.getErrorStream(), errorSB)).start();
new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start();
new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start();
try {
boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS);
if (!s) {
omsLogger.info("SYSTEM===> process timeout");
return new ProcessResult(false, "TIMEOUT");
}
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result);
return new ProcessResult(true, result);
}catch (InterruptedException ie) {
omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted");
return new ProcessResult(false, "Interrupted");
}
}
private void copyStream(InputStream is, StringBuilder sb) {
private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
while ((line = br.readLine()) != null) {
sb.append(line);
// 同步到在线日志
omsLogger.info(line);
}
} catch (Exception e) {
log.warn("[ScriptProcessor] copyStream failed.", e);
omsLogger.warn("[ScriptProcessor] copyStream failed.", e);
sb.append("Exception: ").append(e);
}
}
/**
* 生成脚本名称
* @param instanceId 任务实例ID作为文件名称使用JobId会有更改不生效的问题
* @return 文件名称
*/
protected abstract String genScriptName(Long instanceId);
protected abstract String genScriptName();
/**
* 获取运行命令egshell返回 /bin/sh

View File

@ -17,7 +17,7 @@ public class ShellProcessor extends ScriptProcessor {
}
@Override
protected String genScriptName(Long instanceId) {
protected String genScriptName() {
return String.format("shell_%d.sh", instanceId);
}

View File

@ -37,6 +37,10 @@ public class CommonTaskTracker extends TaskTracker {
// 可以是除 ROOT_TASK_ID 的任何数字
private static final String LAST_TASK_ID = "1111";
// 连续上报多次失败后放弃上报视为结果不可达TaskTracker down
private int reportFailedCnt = 0;
private static final int MAX_REPORT_FAILED_THRESHOLD = 5;
protected CommonTaskTracker(ServerScheduleJobReq req) {
super(req);
}
@ -232,6 +236,10 @@ public class CommonTaskTracker extends TaskTracker {
// 服务器未接受上报则等待下次重新上报
if (!serverAccepted) {
if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) {
log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result);
destroy();
}
return;
}

View File

@ -17,7 +17,6 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
@ -121,14 +120,6 @@ public class FrequentTaskTracker extends TaskTracker {
subDetail.setStatus(status.getV());
subDetail.setSubInstanceId(subId);
// 设置时间
subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), OmsConstant.TIME_PATTERN));
if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) {
subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), OmsConstant.TIME_PATTERN));
}else {
subDetail.setFinishedTime("N/A");
}
history.add(subDetail);
});

View File

@ -20,4 +20,47 @@ akka {
canonical.port = 25520
}
}
# dispatcher
task-tracker-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
processor-tracker-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 64
}
throughput = 10
}
worker-common-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 8
}
throughput = 10
}
}