Merge branch '4.1.1-monitor' into v4.1.1

This commit is contained in:
tjq 2022-09-12 23:03:38 +08:00
commit 65f2a58d2f
60 changed files with 1320 additions and 293 deletions

View File

@ -20,6 +20,11 @@ public class ServerScheduleJobReq implements PowerSerializable {
*/
private List<String> allWorkerAddress;
/**
* 最大机器数量
*/
private Integer maxWorkerCount;
/* *********************** 任务相关属性 *********************** */
/**

View File

@ -15,6 +15,12 @@ import java.util.Map;
@Data
public class TaskTrackerReportInstanceStatusReq implements PowerSerializable {
/**
* 追加上报自己的 appId
* 方便后续的监控日志埋点
*/
private Long appId;
private Long jobId;
private Long instanceId;

View File

@ -21,6 +21,7 @@
<module>powerjob-server-extension</module>
<module>powerjob-server-migrate</module>
<module>powerjob-server-core</module>
<module>powerjob-server-monitor</module>
</modules>
@ -56,6 +57,11 @@
<artifactId>powerjob-server-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-monitor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-extension</artifactId>

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.common;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.AtomicLong;
@ -16,15 +17,27 @@ public class RejectedExecutionHandlerFactory {
private static final AtomicLong COUNTER = new AtomicLong();
/**
* 拒绝执行抛出 RejectedExecutionException
* @param source name for log
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newAbort(String source) {
return (r, e) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " + source);
};
}
/**
* 直接丢弃该任务
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newReject(String source) {
public static RejectedExecutionHandler newDiscard(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);
};
}
@ -35,8 +48,7 @@ public class RejectedExecutionHandlerFactory {
*/
public static RejectedExecutionHandler newCallerRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
r.run();
}
@ -50,8 +62,7 @@ public class RejectedExecutionHandlerFactory {
*/
public static RejectedExecutionHandler newThreadRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
String threadName = source + "-T-" + COUNTER.getAndIncrement();
log.info("[{}] create new thread[{}] to run job", source, threadName);

View File

@ -14,4 +14,6 @@ public class SJ {
public static final Splitter COMMA_SPLITTER = Splitter.on(",");
public static final Joiner COMMA_JOINER = Joiner.on(",");
public static final Joiner MONITOR_JOINER = Joiner.on("|").useForNull("-");
}

View File

@ -0,0 +1,10 @@
package tech.powerjob.server.common.aware;
/**
* PowerJobAware
*
* @author tjq
* @since 2022/9/12
*/
public interface PowerJobAware {
}

View File

@ -0,0 +1,14 @@
package tech.powerjob.server.common.aware;
import tech.powerjob.server.common.module.ServerInfo;
/**
* notify server info
*
* @author tjq
* @since 2022/9/12
*/
public interface ServerInfoAware extends PowerJobAware {
void setServerInfo(ServerInfo serverInfo);
}

View File

@ -0,0 +1,26 @@
package tech.powerjob.server.common.constants;
/**
* 线程池
*
* @author tjq
* @since 2022/9/12
*/
public class PJThreadPool {
/**
* 定时调度用线程池
*/
public static final String TIMING_POOL = "PowerJobTimingPool";
/**
* 后台任务异步线程池
*/
public static final String BACKGROUND_POOL = "PowerJobBackgroundPool";
/**
* 本地数据库专用线程池
*/
public static final String LOCAL_DB_POOL = "PowerJobLocalDbPool";
}

View File

@ -0,0 +1,19 @@
package tech.powerjob.server.common.module;
import lombok.Data;
/**
* current server info
*
* @author tjq
* @since 2022/9/12
*/
@Data
public class ServerInfo {
private Long id;
private String ip;
private String version = "UNKNOWN";
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.common.timewheel.holder;
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
import tech.powerjob.server.common.timewheel.Timer;
/**
* 时间轮单例
@ -11,7 +12,7 @@ import tech.powerjob.server.common.timewheel.HashedWheelTimer;
public class HashedWheelTimerHolder {
// 非精确时间轮 5S 走一格
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);
public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0);
private HashedWheelTimerHolder() {
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.common.timewheel.holder;
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
import tech.powerjob.server.common.timewheel.Timer;
import tech.powerjob.server.common.timewheel.TimerFuture;
import tech.powerjob.server.common.timewheel.TimerTask;
import com.google.common.collect.Maps;
@ -19,9 +20,9 @@ public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确调度时间轮 1MS 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确调度时间轮用于处理高延迟任务 10S 走一格
private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
@ -27,6 +28,10 @@ public class AOPUtils {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
public static String parseRealClassName(JoinPoint joinPoint) {
return joinPoint.getSignature().getDeclaringType().getSimpleName();
}
public static Method parseMethod(ProceedingJoinPoint joinPoint) {
Signature pointSignature = joinPoint.getSignature();
if (!(pointSignature instanceof MethodSignature)) {

View File

@ -146,9 +146,8 @@ public class DispatchService {
// 构造任务调度请求
ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
// 发送请求不可靠需要一个后台线程定期轮询状态
WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers);
WorkerInfo taskTracker = suitableWorkers.get(0);
String taskTrackerAddress = taskTracker.getAddress();
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
@ -182,6 +181,7 @@ public class DispatchService {
}
req.setInstanceId(instanceInfo.getInstanceId());
req.setAllWorkerAddress(finalWorkersIpList);
req.setMaxWorkerCount(jobInfo.getMaxWorkerCount());
// 设置工作流ID
req.setWfInstanceId(instanceInfo.getWfInstanceId());
@ -196,17 +196,4 @@ public class DispatchService {
req.setThreadConcurrency(jobInfo.getConcurrency());
return req;
}
private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List<WorkerInfo> workerInfos) {
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case HEALTH_FIRST:
return workerInfos.get(0);
case RANDOM:
return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size()));
default:
}
// impossible, indian java
return workerInfos.get(0);
}
}

View File

@ -0,0 +1,161 @@
package tech.powerjob.server.core.handler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
/**
* wrapper monitor for IWorkerRequestHandler
*
* @author tjq
* @since 2022/9/11
*/
@Slf4j
public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
@Resource
protected MonitorService monitorService;
@Resource
protected Environment environment;
@Resource
protected ContainerInfoRepository containerInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);
protected abstract Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception;
protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event);
@Override
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
long startMs = System.currentTimeMillis();
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
.setAppName(heartbeat.getAppName())
.setAppId(heartbeat.getAppId())
.setVersion(heartbeat.getVersion())
.setProtocol(heartbeat.getProtocol())
.setTag(heartbeat.getTag())
.setWorkerAddress(heartbeat.getWorkerAddress())
.setDelayMs(startMs - heartbeat.getHeartbeatTime())
.setScore(heartbeat.getSystemMetrics().getScore());
processWorkerHeartbeat0(heartbeat, event);
monitorService.monitor(event);
}
@Override
public Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) {
long startMs = System.currentTimeMillis();
TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent()
.setAppId(req.getAppId())
.setJobId(req.getJobId())
.setInstanceId(req.getInstanceId())
.setWfInstanceId(req.getWfInstanceId())
.setInstanceStatus(InstanceStatus.of(req.getInstanceStatus()))
.setDelayMs(startMs - req.getReportTime())
.setServerProcessStatus(TtReportInstanceStatusEvent.Status.SUCCESS);
try {
return processTaskTrackerReportInstanceStatus0(req, event);
} catch (Exception e) {
event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED);
log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e);
return Optional.of(AskResponse.failed(ExceptionUtils.getMessage(e)));
} finally {
event.setServerProcessCost(System.currentTimeMillis() - startMs);
monitorService.monitor(event);
}
}
@Override
public void processWorkerLogReport(WorkerLogReportReq req) {
WorkerLogReportEvent event = new WorkerLogReportEvent()
.setWorkerAddress(req.getWorkerAddress())
.setLogNum(req.getInstanceLogContents().size());
try {
processWorkerLogReport0(req, event);
event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
} catch (RejectedExecutionException re) {
event.setStatus(WorkerLogReportEvent.Status.REJECTED);
} catch (Throwable t) {
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
log.warn("[WorkerRequestHandler] process worker report failed!", t);
} finally {
monitorService.monitor(event);
}
}
@Override
public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
Long jobId = req.getJobId();
Long appId = req.getAppId();
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (jobInfoOpt.isPresent()) {
JobInfoDO jobInfo = jobInfoOpt.get();
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
return askResponse;
}
@Override
public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) {
String port = environment.getProperty("local.server.port");
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
BeanUtils.copyProperties(containerInfo, dpReq);
dpReq.setContainerId(containerInfo.getId());
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
dpReq.setDownloadURL(downloadURL);
askResponse.setData(JsonUtils.toBytes(dpReq));
}
return askResponse;
}
}

View File

@ -0,0 +1,48 @@
package tech.powerjob.server.core.handler;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import java.util.Optional;
/**
* 定义 server worker 之间需要处理的协议
*
* @author tjq
* @since 2022/9/10
*/
public interface IWorkerRequestHandler {
/**
* 处理 worker 上报的心跳信息
* @param heartbeat 心跳信息
*/
void processWorkerHeartbeat(WorkerHeartbeat heartbeat);
/**
* 处理 TaskTracker 的任务实例上报
* @param req 上报请求
* @return 响应信息
*/
Optional<AskResponse> processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req);
/**
* 处理 worker 查询执行器集群
* @param req 请求
* @return cluster info
*/
AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req);
/**
* 处理 worker 日志推送请求
* @param req 请求
*/
void processWorkerLogReport(WorkerLogReportReq req);
/**
* 处理 worker 的容器部署请求
* @param request 请求
* @return 容器部署信息
*/
AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request);
}

View File

@ -1,171 +0,0 @@
package tech.powerjob.server.core.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.*;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler;
import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* receive and process worker's request
*
* @author tjq
* @since 2021/2/8
*/
@Slf4j
@Component
public class WorkerRequestHandler {
@Resource
private Environment environment;
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private InstanceLogService instanceLogService;
@Resource
private ContainerInfoRepository containerInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
private static WorkerRequestHandler workerRequestHandler;
@PostConstruct
public void initHandler() {
// init akka
AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
// init vert.x
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
}
/**
* 处理 Worker 的心跳请求
* @param heartbeat 心跳包
*/
public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
WorkerClusterManagerService.updateStatus(heartbeat);
}
/**
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息再更新实例状态这里一定不会有异常
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
// 更新工作流上下文信息
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
}
instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null));
}
return Optional.empty();
}
/**
* 处理OMS在线日志请求
* @param req 日志请求
*/
public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get ...
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
/**
* 处理 Worker容器部署请求
* @param req 容器部署请求
*/
public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
String port = environment.getProperty("local.server.port");
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest();
BeanUtils.copyProperties(containerInfo, dpReq);
dpReq.setContainerId(containerInfo.getId());
String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion());
dpReq.setDownloadURL(downloadURL);
askResponse.setData(JsonUtils.toBytes(dpReq));
}
return askResponse;
}
/**
* 处理 worker 请求获取当前任务所有处理器节点的请求
* @param req jobId + appId
*/
public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
Long jobId = req.getJobId();
Long appId = req.getAppId();
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (jobInfoOpt.isPresent()) {
JobInfoDO jobInfo = jobInfoOpt.get();
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
return askResponse;
}
public static WorkerRequestHandler getWorkerRequestHandler() {
if (workerRequestHandler == null) {
workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class);
}
return workerRequestHandler;
}
}

View File

@ -0,0 +1,28 @@
package tech.powerjob.server.core.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* WorkerRequestHandlerHolder
*
* @author tjq
* @since 2022/9/11
*/
@Component
public class WorkerRequestHandlerHolder {
private static IWorkerRequestHandler workerRequestHandler;
public static IWorkerRequestHandler fetchWorkerRequestHandler() {
return workerRequestHandler;
}
@Autowired
public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) {
WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler;
}
}

View File

@ -0,0 +1,67 @@
package tech.powerjob.server.core.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.request.WorkerHeartbeat;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import javax.annotation.Resource;
import java.util.Optional;
/**
* receive and process worker's request
*
* @author tjq
* @since 2022/9/11
*/
@Slf4j
@Component
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private InstanceLogService instanceLogService;
@Override
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
WorkerClusterManagerService.updateStatus(heartbeat);
}
@Override
protected Optional<AskResponse> processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception {
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息再更新实例状态这里一定不会有异常
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
// 更新工作流上下文信息
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
}
instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息
if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) {
return Optional.of(AskResponse.succeed(null));
}
return Optional.empty();
}
@Override
protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get ...
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}
}

View File

@ -0,0 +1,26 @@
package tech.powerjob.server.core.handler.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.transport.starter.VertXStarter;
import javax.annotation.PostConstruct;
/**
* 初始化器
*
* @author tjq
* @since 2022/9/11
*/
@Component
public class Initializer {
@PostConstruct
public void initHandler() {
// init akka
AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME);
// init vert.x
VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler());
}
}

View File

@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler;
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
/**
* 处理 Worker 请求
@ -24,7 +24,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
public static Props defaultProps(){
return Props.create(WorkerRequestAkkaHandler.class)
.withDispatcher("akka.worker-request-actor-dispatcher")
.withDispatcher("akka.w-r-c-d")
.withRouter(
new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)
.withResizer(new DefaultResizer(
@ -42,9 +42,9 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb))
.match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb))
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req))
.match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req))
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj))
@ -71,7 +71,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
Optional<AskResponse> askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
Optional<AskResponse> askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
if (askResponseOpt.isPresent()) {
getSender().tell(AskResponse.succeed(null), getSelf());
}
@ -85,7 +85,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
* @param req 容器部署请求
*/
private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) {
getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf());
getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf());
}
/**
@ -94,7 +94,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor {
*/
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf());
getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf());
}
}

View File

@ -21,7 +21,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.Properties;
import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler;
import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler;
/**
* WorkerRequestHandler
@ -46,14 +46,14 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT)
.handler(ctx -> {
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat);
fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);
success(ctx);
});
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
.blockingHandler(ctx -> {
TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);
try {
getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req);
fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);
out(ctx, AskResponse.succeed(null));
} catch (Exception e) {
log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);
@ -63,7 +63,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT)
.blockingHandler(ctx -> {
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
getWorkerRequestHandler().onReceiveWorkerLogReportReq(req);
fetchWorkerRequestHandler().processWorkerLogReport(req);
success(ctx);
});
server.requestHandler(router).listen(port);

View File

@ -1,5 +1,7 @@
package tech.powerjob.server.core.instance;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.TimeExpressionType;
@ -7,6 +9,7 @@ import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.StringPage;
@ -67,7 +70,9 @@ public class InstanceLogService {
* 本地维护了在线日志的任务实例ID
*/
private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
private final ExecutorService workerPool;
@Resource(name = PJThreadPool.BACKGROUND_POOL)
private AsyncTaskExecutor powerJobBackgroundPool;
/**
* 分段锁
@ -87,16 +92,12 @@ public class InstanceLogService {
*/
private static final long EXPIRE_INTERVAL_MS = 60000;
public InstanceLogService() {
int coreSize = Runtime.getRuntime().availableProcessors();
workerPool = new ThreadPoolExecutor(coreSize, coreSize, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue());
}
/**
* 提交日志记录持久化到本地数据库中
* @param workerAddress 上报机器地址
* @param logs 任务实例运行时日志
*/
@Async(value = PJThreadPool.LOCAL_DB_POOL)
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
@ -190,7 +191,7 @@ public class InstanceLogService {
* @return 异步结果
*/
private Future<File> prepareLogFile(long instanceId) {
return workerPool.submit(() -> {
return powerJobBackgroundPool.submit(() -> {
// 在线日志还在不断更新需要使用本地数据库中的数据
if (instanceId2LastReportTime.containsKey(instanceId)) {
return genTemporaryLogFile(instanceId);
@ -203,7 +204,7 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID
*/
@Async("omsBackgroundPool")
@Async(PJThreadPool.BACKGROUND_POOL)
public void sync(Long instanceId) {
Stopwatch sw = Stopwatch.createStarted();
@ -345,7 +346,7 @@ public class InstanceLogService {
}
@Async("omsTimingPool")
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = 120000)
public void timingCheck() {

View File

@ -191,7 +191,7 @@ public class InstanceManager {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 上报日志数据
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS);
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);
// workflow 特殊处理
if (wfInstanceId != null) {

View File

@ -33,13 +33,14 @@ public class InstanceMetadataService implements InitializingBean {
@Value("${oms.instance.metadata.cache.size}")
private int instanceMetadataCacheSize;
private static final int CACHE_CONCURRENCY_LEVEL = 4;
private static final int CACHE_CONCURRENCY_LEVEL = 16;
@Override
public void afterPropertiesSet() throws Exception {
instanceId2JobInfoCache = CacheBuilder.newBuilder()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(instanceMetadataCacheSize)
.softValues()
.build();
}

View File

@ -11,7 +11,10 @@ import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.utils.AOPUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.lock.SlowLockEvent;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -29,6 +32,9 @@ import java.util.concurrent.locks.ReentrantLock;
@Order(1)
public class UseCacheLockAspect {
@Resource
private MonitorService monitorService;
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
private static final long SLOW_THRESHOLD = 100;
@ -53,8 +59,19 @@ public class UseCacheLockAspect {
try {
long timeCost = System.currentTimeMillis() - start;
if (timeCost > SLOW_THRESHOLD) {
final SlowLockEvent slowLockEvent = new SlowLockEvent()
.setType(SlowLockEvent.Type.LOCAL)
.setLockType(useCacheLock.type())
.setLockKey(String.valueOf(key))
.setCallerService(method.getDeclaringClass().getSimpleName())
.setCallerMethod(method.getName())
.setCost(timeCost);
monitorService.monitor(slowLockEvent);
log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
useCacheLock.key(),
key,
JSON.toJSONString(point.getArgs()));
}
return point.proceed();

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
@ -62,7 +63,7 @@ public class CleanService {
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
@Async("omsTimingPool")
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
public void timingClean() {

View File

@ -4,6 +4,7 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.persistence.remote.model.*;
@ -59,7 +60,7 @@ public class InstanceStatusCheckService {
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Async("omsTimingPool")
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = 10000)
public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted();

View File

@ -3,6 +3,7 @@ package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
@ -73,7 +74,7 @@ public class PowerScheduleService {
private static final long SCHEDULE_RATE = 15000;
@Async("omsTimingPool")
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = SCHEDULE_RATE)
public void timingSchedule() {

View File

@ -1,9 +1,9 @@
package tech.powerjob.server.core.uid;
import tech.powerjob.server.remote.server.ServerInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.server.remote.server.self.ServerInfoService;
/**
* 唯一ID生成服务使用 Twitter snowflake 算法
@ -23,7 +23,7 @@ public class IdGenerateService {
@Autowired
public IdGenerateService(ServerInfoService serverInfoService) {
long id = serverInfoService.getServerId();
long id = serverInfoService.fetchServiceInfo().getId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
}

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server-monitor</artifactId>
<version>${project.parent.version}</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,22 @@
package tech.powerjob.server.monitor;
/**
* 监控事件
*
* @author tjq
* @since 2022/9/6
*/
public interface Event {
/**
* 监控事件的类型
* @return 监控类型
*/
String type();
/**
* 监控事件的内容
* @return 监控事件的内容
*/
String message();
}

View File

@ -0,0 +1,21 @@
package tech.powerjob.server.monitor;
/**
* 监视器
*
* @author tjq
* @since 2022/9/6
*/
public interface Monitor {
/**
* 全局上下文绑定 & 初始化
*/
void init();
/**
* 记录监控事件
* 请注意该方法务必异步不阻塞
* @param event 事件
*/
void record(Event event);
}

View File

@ -0,0 +1,11 @@
package tech.powerjob.server.monitor;
/**
* 对外暴露的监控服务
*
* @author tjq
* @since 2022/9/10
*/
public interface MonitorService {
void monitor(Event event);
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.server.monitor;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* PowerJob 服务端监控
*
* @author tjq
* @since 2022/9/10
*/
@Slf4j
@Component
public class PowerJobMonitorService implements MonitorService {
private final List<Monitor> monitors = Lists.newLinkedList();
@Autowired
public PowerJobMonitorService(List<Monitor> monitors) {
monitors.forEach(m -> {
log.info("[MonitorService] register monitor: {}", m.getClass().getName());
this.monitors.add(m);
});
}
@Override
public void monitor(Event event) {
monitors.forEach(m -> m.record(event));
}
}

View File

@ -0,0 +1,48 @@
package tech.powerjob.server.monitor.events.db;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.monitor.Event;
/**
* 数据库操作事件
*
* @author tjq
* @since 2022/9/6
*/
@Setter
@Accessors(chain = true)
public class DatabaseEvent implements Event {
private DatabaseType type;
private String serviceName;
private String methodName;
private Status status;
private Integer rows;
private long cost;
private String errorMsg;
private String extra;
public enum Status {
SUCCESS,
FAILED
}
@Override
public String type() {
return "MONITOR_LOGGER_DB_OPERATION";
}
@Override
public String message() {
return SJ.MONITOR_JOINER.join(type, serviceName, methodName, status, rows, cost, errorMsg, extra);
}
}

View File

@ -0,0 +1,22 @@
package tech.powerjob.server.monitor.events.db;
/**
* DatabaseEventType
*
* @author tjq
* @since 2022/9/6
*/
public enum DatabaseType {
/**
* 本地存储库H2
*/
LOCAL,
/**
* 远程核心库
*/
CORE,
/**
* 扩展库
*/
EXTRA
}

View File

@ -0,0 +1,39 @@
package tech.powerjob.server.monitor.events.lock;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.monitor.Event;
/**
* 长时间等待锁事件
*
* @author tjq
* @since 2022/9/9
*/
@Setter
@Accessors(chain = true)
public class SlowLockEvent implements Event {
private Type type;
private String lockType;
private String lockKey;
private String callerService;
private String callerMethod;
private long cost;
public enum Type {
LOCAL,
DB
}
@Override
public String type() {
return "MONITOR_LOGGER_SLOW_LOCK";
}
@Override
public String message() {
return SJ.MONITOR_JOINER.join(type, lockType, lockKey, callerService, callerMethod, cost);
}
}

View File

@ -0,0 +1,47 @@
package tech.powerjob.server.monitor.events.w2s;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.monitor.Event;
/**
* TaskTrackerReportInstanceStatus
*
* @author tjq
* @since 2022/9/9
*/
@Setter
@Accessors(chain = true)
public class TtReportInstanceStatusEvent implements Event {
private Long appId;
private Long jobId;
private Long instanceId;
private Long wfInstanceId;
private InstanceStatus instanceStatus;
private Long delayMs;
private Status serverProcessStatus;
private Long serverProcessCost;
public enum Status {
SUCCESS,
FAILED
}
@Override
public String type() {
return "MONITOR_LOGGER_TT_REPORT_STATUS";
}
@Override
public String message() {
return SJ.MONITOR_JOINER.join(appId, jobId, instanceId, wfInstanceId, instanceStatus, delayMs, serverProcessStatus, serverProcessCost);
}
}

View File

@ -0,0 +1,44 @@
package tech.powerjob.server.monitor.events.w2s;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.monitor.Event;
/**
* worker 心跳事件监控
*
* @author tjq
* @since 2022/9/9
*/
@Setter
@Accessors(chain = true)
public class WorkerHeartbeatEvent implements Event {
private String appName;
/**
* 虽然和 AppName 冗余但考虑到其他日志使用 appId 监控此处可方便潜在的其他处理
*/
private Long appId;
private String version;
private String protocol;
private String tag;
private String workerAddress;
/**
* worker 上报时间与 server 之间的延迟
*/
private long delayMs;
private Integer score;
@Override
public String type() {
return "MONITOR_LOGGER_WORKER_HEART_BEAT";
}
@Override
public String message() {
return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, delayMs, score);
}
}

View File

@ -0,0 +1,48 @@
package tech.powerjob.server.monitor.events.w2s;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.monitor.Event;
/**
* description
*
* @author tjq
* @since 2022/9/11
*/
@Setter
@Accessors(chain = true)
public class WorkerLogReportEvent implements Event {
private String workerAddress;
/**
* 日志条数
*/
private long logNum;
/**
* 日志大小用于统计 IO 压力
*/
private long logSize;
private Status status;
public enum Status {
SUCCESS,
REJECTED,
EXCEPTION
}
@Override
public String type() {
return "MONITOR_LOGGER_WORKER_LOG_REPORT";
}
@Override
public String message() {
return SJ.MONITOR_JOINER.join(workerAddress, logNum, logSize, status);
}
}

View File

@ -0,0 +1,43 @@
package tech.powerjob.server.monitor.monitors;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.monitor.Event;
import tech.powerjob.server.monitor.Monitor;
/**
* 系统默认实现基于日志的监控监视器
* 需要接入方自行基于类 ELK 系统采集
*
* @author tjq
* @since 2022/9/6
*/
@Component
public class LogMonitor implements Monitor, ServerInfoAware {
/**
* server 启动依赖 DBDB会被 monitor因此最初的几条 log serverInfo 一定为空在此处简单防空
*/
private ServerInfo serverInfo = new ServerInfo();
private static final String MDC_KEY_SERVER_ID = "serverId";
@Override
public void init() {
}
@Override
public void record(Event event) {
MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
LoggerFactory.getLogger(event.type()).info(event.message());
}
@Override
public void setServerInfo(ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
}

View File

@ -23,6 +23,10 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-monitor</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,90 @@
package tech.powerjob.server.persistence.monitor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.utils.AOPUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.db.DatabaseEvent;
import tech.powerjob.server.monitor.events.db.DatabaseType;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
/**
* 监控切面
*
* @author tjq
* @since 2022/9/6
*/
@Slf4j
@Aspect
@Component
public class DatabaseMonitorAspect {
@Resource
private MonitorService monitorService;
@Around("execution(* tech.powerjob.server.persistence.remote.repository..*.*(..))")
public Object monitorCoreDB(ProceedingJoinPoint joinPoint) throws Throwable {
return wrapperMonitor(joinPoint, DatabaseType.CORE);
}
@Around("execution(* tech.powerjob.server.persistence.local..*.*(..))")
public Object monitorLocalDB(ProceedingJoinPoint joinPoint) throws Throwable {
return wrapperMonitor(joinPoint, DatabaseType.LOCAL);
}
private Object wrapperMonitor(ProceedingJoinPoint point, DatabaseType type) throws Throwable {
String classNameMini = AOPUtils.parseRealClassName(point);
final String methodName = point.getSignature().getName();
DatabaseEvent event = new DatabaseEvent().setType(type)
.setServiceName(classNameMini)
.setMethodName(methodName)
.setStatus(DatabaseEvent.Status.SUCCESS);
long startTs = System.currentTimeMillis();
try {
final Object ret = point.proceed();
event.setRows(parseEffectRows(ret));
return ret;
} catch (Throwable t) {
event.setErrorMsg(t.getClass().getSimpleName()).setStatus(DatabaseEvent.Status.FAILED);
throw t;
} finally {
long cost = System.currentTimeMillis() - startTs;
monitorService.monitor(event.setCost(cost));
}
}
private static Integer parseEffectRows(Object ret) {
// 从性能角度考虑最高频场景放在最前面判断
if (ret instanceof Number) {
return ((Number) ret).intValue();
}
if (ret instanceof Optional) {
return ((Optional<?>) ret).isPresent() ? 1 : 0;
}
if (ret instanceof Collection) {
return ((Collection<?>) ret).size();
}
if (ret instanceof Slice) {
return ((Slice<?>) ret).getSize();
}
if (ret instanceof Stream) {
return null;
}
// TODO: 直接返回对象的方法全部改成 Optional
return ret == null ? 0 : 1;
}
}

View File

@ -0,0 +1,19 @@
package tech.powerjob.server.remote.server.self;
import tech.powerjob.server.common.module.ServerInfo;
/**
* ServerInfoService
*
* @author tjq
* @since 2022/9/12
*/
public interface ServerInfoService {
/**
* fetch current server info
* @return ServerInfo
*/
ServerInfo fetchServiceInfo();
}

View File

@ -1,10 +1,13 @@
package tech.powerjob.server.remote.server;
package tech.powerjob.server.remote.server.self;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.info.BuildProperties;
import org.springframework.scheduling.annotation.Async;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.ServerInfoDO;
import tech.powerjob.server.persistence.remote.repository.ServerInfoRepository;
@ -28,37 +31,25 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
public class ServerInfoService {
public class ServerInfoServiceImpl implements ServerInfoService {
private final String ip;
private final long serverId;
private final ServerInfo serverInfo;
private final ServerInfoRepository serverInfoRepository;
private String version = "UNKNOWN";
private static final long MAX_SERVER_CLUSTER_SIZE = 10000;
private static final String SERVER_INIT_LOCK = "server_init_lock";
private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;
public long getServerId() {
return serverId;
}
public String getServerIp() {
return ip;
}
public String getServerVersion() {
return version;
}
@Autowired
public ServerInfoService(LockService lockService, ServerInfoRepository serverInfoRepository) {
public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {
this.ip = NetUtils.getLocalHost();
this.serverInfo = new ServerInfo();
String ip = NetUtils.getLocalHost();
serverInfo.setIp(ip);
this.serverInfoRepository = serverInfoRepository;
Stopwatch sw = Stopwatch.createStarted();
@ -80,10 +71,11 @@ public class ServerInfoService {
}
if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {
this.serverId = server.getId();
serverInfo.setId(server.getId());
} else {
this.serverId = retryServerId();
serverInfoRepository.updateIdByIp(this.serverId, ip);
long retryServerId = retryServerId();
serverInfo.setId(retryServerId);
serverInfoRepository.updateIdByIp(retryServerId, ip);
}
} catch (Exception e) {
@ -93,12 +85,12 @@ public class ServerInfoService {
lockService.unlock(SERVER_INIT_LOCK);
}
log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverId, sw);
log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);
}
@Scheduled(fixedRate = 15000, initialDelay = 15000)
public void heartbeat() {
serverInfoRepository.updateGmtModifiedByIp(ip, new Date());
serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());
}
@ -142,7 +134,12 @@ public class ServerInfoService {
}
String pomVersion = buildProperties.getVersion();
if (StringUtils.isNotBlank(pomVersion)) {
version = pomVersion;
serverInfo.setVersion(pomVersion);
}
}
@Override
public ServerInfo fetchServiceInfo() {
return serverInfo;
}
}

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.extension.WorkerFilter;
@ -44,8 +45,17 @@ public class WorkerClusterQueryService {
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
// 按健康度排序
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case RANDOM:
Collections.shuffle(workers);
break;
case HEALTH_FIRST:
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
break;
default:
// do nothing
}
// 限定集群大小0代表不限制
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {

View File

@ -31,6 +31,10 @@
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-common</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-monitor</artifactId>
</dependency>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-server-persistence</artifactId>

View File

@ -9,7 +9,7 @@ import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.remote.server.ServerInfoService;
import tech.powerjob.server.remote.server.self.ServerInfoService;
import javax.annotation.Resource;
@ -39,7 +39,7 @@ public class SwaggerConfig {
.description("Distributed scheduling and computing framework.")
.license("Apache Licence 2")
.termsOfServiceUrl("https://github.com/PowerJob/PowerJob")
.version(serverInfoService.getServerVersion())
.version(serverInfoService.fetchServiceInfo().getVersion())
.build();
return new Docket(DocumentationType.SWAGGER_2)

View File

@ -1,5 +1,7 @@
package tech.powerjob.server.config;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import tech.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
@ -8,14 +10,12 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import tech.powerjob.server.common.constants.PJThreadPool;
import java.util.concurrent.*;
/**
* 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsBackgroundPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池
*
* @author tjq
* @since 2020/4/28
@ -25,28 +25,40 @@ import java.util.concurrent.*;
@Configuration
public class ThreadPoolConfig {
@Bean("omsTimingPool")
public Executor getTimingPool() {
@Bean(PJThreadPool.TIMING_POOL)
public TaskExecutor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// use SynchronousQueue
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTiming"));
executor.setThreadNamePrefix("PJ-TIMING-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun(PJThreadPool.TIMING_POOL));
return executor;
}
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
@Bean(PJThreadPool.BACKGROUND_POOL)
public AsyncTaskExecutor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool"));
executor.setThreadNamePrefix("PJ-BG-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL));
return executor;
}
@Bean(PJThreadPool.LOCAL_DB_POOL)
public TaskExecutor initOmsLocalDbPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int tSize = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
executor.setCorePoolSize(tSize);
executor.setMaxPoolSize(tSize);
executor.setQueueCapacity(2048);
executor.setThreadNamePrefix("PJ-LOCALDB-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL));
return executor;
}
@ -55,7 +67,7 @@ public class ThreadPoolConfig {
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setThreadNamePrefix("PowerJobSchedulePool-");
scheduler.setThreadNamePrefix("PJ-WS-");
scheduler.setDaemon(true);
return scheduler;
}

View File

@ -0,0 +1,29 @@
package tech.powerjob.server.support;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.aware.ServerInfoAware;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.remote.server.self.ServerInfoService;
import java.util.List;
/**
* ServerInfoAwareProcessor
*
* @author tjq
* @since 2022/9/12
*/
@Slf4j
@Component
public class ServerInfoAwareProcessor {
public ServerInfoAwareProcessor(ServerInfoService serverInfoService, List<ServerInfoAware> awareList) {
final ServerInfo serverInfo = serverInfoService.fetchServiceInfo();
log.info("[ServerInfoAwareProcessor] current server info: {}", serverInfo);
awareList.forEach(aware -> {
aware.setServerInfo(serverInfo);
log.info("[ServerInfoAwareProcessor] set ServerInfo for: {} successfully", aware);
});
}
}

View File

@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.multipart.MultipartFile;
import tech.powerjob.server.common.utils.AOPUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -60,8 +61,8 @@ public class WebLogAspect {
}
HttpServletRequest request = requestAttributes.getRequest();
String[] classNameSplit = joinPoint.getSignature().getDeclaringTypeName().split("\\.");
String classNameMini = classNameSplit[classNameSplit.length - 1];
String classNameMini = AOPUtils.parseRealClassName(joinPoint);
String classMethod = classNameMini + "." + joinPoint.getSignature().getName();
// 排除特殊类

View File

@ -4,9 +4,10 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.module.ServerInfo;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.server.ServerInfoService;
import tech.powerjob.server.remote.server.self.ServerInfoService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.web.response.SystemOverviewVO;
@ -70,8 +71,7 @@ public class SystemInfoController {
// 服务器时间
overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));
SystemOverviewVO.CurrentServerInfo info = new SystemOverviewVO.CurrentServerInfo(serverInfoService.getServerId(), serverInfoService.getServerIp(), serverInfoService.getServerVersion());
overview.setCurrentServerInfo(info);
overview.setServerInfo(serverInfoService.fetchServiceInfo());
return ResultDTO.success(overview);
}

View File

@ -3,6 +3,7 @@ package tech.powerjob.server.web.response;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import tech.powerjob.server.common.module.ServerInfo;
/**
* 系统概览
@ -21,13 +22,5 @@ public class SystemOverviewVO {
// 服务器时间
private String serverTime;
private CurrentServerInfo currentServerInfo;
@Getter
@AllArgsConstructor
public static class CurrentServerInfo {
private final long id;
private final String ip;
private final String version;
}
private ServerInfo serverInfo;
}

View File

@ -0,0 +1,132 @@
<?xml version="1.0"?>
<included>
<property name="MONITOR_LOG_PATH" value="${LOG_PATH}/monitors"/>
<property name="ROTATE_PATTERN" value="%d{yyyy-MM-dd}.%i"/>
<property name="MONITOR_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|%X{serverId}|%msg%n"/>
<!-- database -->
<appender name="MONITOR_LOGGER_DB_OPERATION_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${MONITOR_LOG_PATH}/database.log</file>
<encoder>
<pattern>${MONITOR_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${MONITOR_LOG_PATH}/database.log.${ROTATE_PATTERN}</fileNamePattern>
<maxHistory>3</maxHistory>
<maxFileSize>200MB</maxFileSize>
<totalSizeCap>1000MB</totalSizeCap>
</rollingPolicy>
</appender>
<appender name="ASYNC_MONITOR_LOGGER_DB_OPERATION_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="MONITOR_LOGGER_DB_OPERATION_APPENDER"/>
</appender>
<logger name="MONITOR_LOGGER_DB_OPERATION" level="INFO" additivity="false">
<appender-ref ref="ASYNC_MONITOR_LOGGER_DB_OPERATION_APPENDER"/>
</logger>
<!-- TtReportInstanceStatusEvent -->
<appender name="MONITOR_LOGGER_TT_REPORT_STATUS_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${MONITOR_LOG_PATH}/tt_status_report.log</file>
<encoder>
<pattern>${MONITOR_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${MONITOR_LOG_PATH}/tt_status_report.log.${ROTATE_PATTERN}</fileNamePattern>
<maxHistory>3</maxHistory>
<maxFileSize>200MB</maxFileSize>
<totalSizeCap>1000MB</totalSizeCap>
</rollingPolicy>
</appender>
<appender name="ASYNC_MONITOR_LOGGER_TT_REPORT_STATUS_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="MONITOR_LOGGER_TT_REPORT_STATUS_APPENDER"/>
</appender>
<logger name="MONITOR_LOGGER_TT_REPORT_STATUS" level="INFO" additivity="false">
<appender-ref ref="ASYNC_MONITOR_LOGGER_TT_REPORT_STATUS_APPENDER"/>
</logger>
<!-- WorkerHeartbeatEvent -->
<appender name="MONITOR_LOGGER_WORKER_HEART_BEAT_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${MONITOR_LOG_PATH}/worker_heartbeat.log</file>
<encoder>
<pattern>${MONITOR_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${MONITOR_LOG_PATH}/worker_heartbeat.log.${ROTATE_PATTERN}</fileNamePattern>
<maxHistory>3</maxHistory>
<maxFileSize>200MB</maxFileSize>
<totalSizeCap>1000MB</totalSizeCap>
</rollingPolicy>
</appender>
<appender name="ASYNC_MONITOR_LOGGER_WORKER_HEART_BEAT_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="MONITOR_LOGGER_WORKER_HEART_BEAT_APPENDER"/>
</appender>
<logger name="MONITOR_LOGGER_WORKER_HEART_BEAT" level="INFO" additivity="false">
<appender-ref ref="ASYNC_MONITOR_LOGGER_WORKER_HEART_BEAT_APPENDER"/>
</logger>
<!-- WorkerLogReportEvent -->
<appender name="MONITOR_LOGGER_WORKER_LOG_REPORT_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${MONITOR_LOG_PATH}/worker_log_report.log</file>
<encoder>
<pattern>${MONITOR_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${MONITOR_LOG_PATH}/worker_log_report.log.${ROTATE_PATTERN}</fileNamePattern>
<maxHistory>3</maxHistory>
<maxFileSize>200MB</maxFileSize>
<totalSizeCap>1000MB</totalSizeCap>
</rollingPolicy>
</appender>
<appender name="ASYNC_MONITOR_LOGGER_WORKER_LOG_REPORT_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="MONITOR_LOGGER_WORKER_LOG_REPORT_APPENDER"/>
</appender>
<logger name="MONITOR_LOGGER_WORKER_LOG_REPORT" level="INFO" additivity="false">
<appender-ref ref="ASYNC_MONITOR_LOGGER_WORKER_LOG_REPORT_APPENDER"/>
</logger>
<!-- SlowLockEvent -->
<appender name="MONITOR_LOGGER_SLOW_LOCK_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${MONITOR_LOG_PATH}/lock.log</file>
<encoder>
<pattern>${MONITOR_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${MONITOR_LOG_PATH}/lock.log.${ROTATE_PATTERN}</fileNamePattern>
<maxHistory>3</maxHistory>
<maxFileSize>200MB</maxFileSize>
<totalSizeCap>1000MB</totalSizeCap>
</rollingPolicy>
</appender>
<appender name="ASYNC_MONITOR_LOGGER_SLOW_LOCK_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
<appender-ref ref="MONITOR_LOGGER_SLOW_LOCK_APPENDER"/>
</appender>
<logger name="MONITOR_LOGGER_SLOW_LOCK" level="INFO" additivity="false">
<appender-ref ref="ASYNC_MONITOR_LOGGER_SLOW_LOCK_APPENDER"/>
</logger>
</included>

View File

@ -14,6 +14,9 @@
-->
<property name="LOG_PATH" value="${user.home}/powerjob/server/logs"/>
<!-- include other logback configs -->
<include resource="logback-config/powerjob_monitor.xml"/>
<!-- Configuration for ERROR logs. All error logs will write twice. -->
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/powerjob-server-error.log</file>

View File

@ -25,7 +25,8 @@ akka {
}
}
worker-request-actor-dispatcher {
# worker-request-core-dispatcher
w-r-c-d {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use

View File

@ -17,10 +17,15 @@ import java.util.Map;
@Slf4j
public class ProcessorTrackerStatusHolder {
private final Long instanceId;
private final Integer maxWorkerCount;
// ProcessorTracker的address(IP:Port) -> 状态
private final Map<String, ProcessorTrackerStatus> address2Status;
public ProcessorTrackerStatusHolder(List<String> allWorkerAddress) {
public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List<String> allWorkerAddress) {
this.instanceId = instanceId;
this.maxWorkerCount = maxWorkerCount;
address2Status = Maps.newConcurrentMap();
allWorkerAddress.forEach(address -> {
@ -38,7 +43,7 @@ public class ProcessorTrackerStatusHolder {
public ProcessorTrackerStatus getProcessorTrackerStatus(String address) {
// remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况0.001% 概率
return address2Status.computeIfAbsent(address, ignore -> {
log.warn("[ProcessorTrackerStatusHolder] unregistered worker: {}", address);
log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address);
ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus();
processorTrackerStatus.init(address);
return processorTrackerStatus;
@ -90,9 +95,9 @@ public class ProcessorTrackerStatusHolder {
/**
* 注册新的执行节点
* @param address 新的执行节点地址
* @return true: 注册成功 / false已存在
* @return true: register successfully / false: already exists
*/
public boolean register(String address) {
private boolean registerOne(String address) {
ProcessorTrackerStatus pts = address2Status.get(address);
if (pts != null) {
return false;
@ -100,9 +105,50 @@ public class ProcessorTrackerStatusHolder {
pts = new ProcessorTrackerStatus();
pts.init(address);
address2Status.put(address, pts);
log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address);
return true;
}
public void register(List<String> workerIpList) {
if (endlessWorkerNum()) {
workerIpList.forEach(this::registerOne);
return;
}
List<String> availableProcessorTrackers = getAvailableProcessorTrackers();
int currentWorkerSize = availableProcessorTrackers.size();
int needMoreNum = maxWorkerCount - currentWorkerSize;
if (needMoreNum <= 0) {
return;
}
log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum);
for (String newIp : workerIpList) {
boolean success = registerOne(newIp);
if (success) {
needMoreNum --;
}
if (needMoreNum <= 0) {
return;
}
}
}
/**
* 检查是否需要动态加载新的执行器
* @return check need more workers
*/
public boolean checkNeedMoreWorker() {
if (endlessWorkerNum()) {
return true;
}
return getAvailableProcessorTrackers().size() < maxWorkerCount;
}
private boolean endlessWorkerNum() {
return maxWorkerCount == null || maxWorkerCount == 0;
}
public void remove(List<String> addressList) {
addressList.forEach(address2Status::remove);
}

View File

@ -158,6 +158,7 @@ public class CommonTaskTracker extends TaskTracker {
log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setAppId(workerRuntime.getAppId());
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setWfInstanceId(instanceInfo.getWfInstanceId());

View File

@ -7,14 +7,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
@ -339,6 +338,7 @@ public class FrequentTaskTracker extends TaskTracker {
}
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setAppId(workerRuntime.getAppId());
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setReportTime(System.currentTimeMillis());

View File

@ -124,7 +124,7 @@ public abstract class TaskTracker {
// 保护性操作
instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
this.finished = new AtomicBoolean(false);
// 只有工作流中的任务允许向工作流中追加上下文数据
@ -562,6 +562,13 @@ public abstract class TaskTracker {
protected class WorkerDetector implements Runnable {
@Override
public void run() {
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);
if (!needMoreWorker) {
return;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
if (StringUtils.isEmpty(serverPath)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
@ -575,11 +582,7 @@ public abstract class TaskTracker {
}
try {
List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
workerList.forEach(address -> {
if (ptStatusHolder.register(address)) {
log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address);
}
});
ptStatusHolder.register(workerList);
} catch (Exception e) {
log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
}