feat: optimize Thread#stop usage

This commit is contained in:
tjq 2023-01-04 00:39:31 +08:00
parent da04e4b048
commit 9f6d421ed2
3 changed files with 52 additions and 15 deletions

View File

@ -24,6 +24,12 @@ public class PowerJobDKey {
public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored";
public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period";
/**
* allowed PowerJob to invoke Thread#stop to kill a thread when PowerJob can't interrupt the thread
* <a href="https://stackoverflow.com/questions/16504140/thread-stop-deprecated">It's VERY dangerous</a>
*/
public static final String WORKER_ALLOWED_FORCE_STOP_THREAD = "powerjob.worker.allowed-force-stop-thread";
/**
* ms
*/

View File

@ -5,7 +5,6 @@ import akka.pattern.Patterns;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
@ -113,7 +112,7 @@ public abstract class TaskTracker {
serverActor.tell(response, null);
}
protected void reportFinalStatus(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
protected void reportFinalStatusThenDestory(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
// 最终状态需要可靠上报
CompletionStage<Object> ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15));
boolean serverAccepted = false;

View File

@ -236,6 +236,13 @@ public class LightTaskTracker extends TaskTracker {
}
/*
TODO建议添加 + 销毁判断2种极限场景
1. processTask 调用同时定时任务再次触发理论上定时任务的处理栈上会抛出 InterruptException虽然无实际功能的影响但不够优雅而且总归是问题
2. 先定时任务触发同时 processTask 调用这种情况好像没太大的问题重复上报2次
总而言之建议考虑理论情况下的"并行"场景价格锁和入口状态判断增加代码健壮性
*/
private void checkAndReportStatus() {
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
@ -255,13 +262,13 @@ public class LightTaskTracker extends TaskTracker {
final Thread workerThread = executeThread.get();
if (!finished.get() && workerThread != null) {
// 未能成功打断任务强制停止
log.warn("[TaskTracker-{}] task need stop,but fail to interrupt it,force stop thread {}", instanceId, executeThread.get().getName());
try {
workerThread.stop();
finished.set(true);
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
// 被终止的任务不需要上报状态
destroy();
if (tryForceStopThread(workerThread)) {
finished.set(true);
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
// 被终止的任务不需要上报状态
destroy();
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e);
}
@ -283,7 +290,7 @@ public class LightTaskTracker extends TaskTracker {
reportInstanceStatusReq.setEndTime(taskEndTime);
// 微操一下上报最终状态时重新设置下时间并且增加一小段偏移保证在并发上报运行中状态以及最终状态时最终状态的上报时间晚于运行中的状态
reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1);
reportFinalStatus(serverActor, reportInstanceStatusReq);
reportFinalStatusThenDestory(serverActor, reportInstanceStatusReq);
return;
}
// 未完成的任务只需要上报状态
@ -322,13 +329,13 @@ public class LightTaskTracker extends TaskTracker {
return;
}
// 未能成功打断任务强制终止
log.warn("[TaskTracker-{}] task timeout,but fail to interrupt it,force stop thread {}", instanceId, workerThread.getName());
try {
workerThread.stop();
finished.set(true);
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP);
log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName());
if (tryForceStopThread(workerThread)) {
finished.set(true);
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP);
log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName());
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e);
}
@ -364,4 +371,29 @@ public class LightTaskTracker extends TaskTracker {
return result.substring(0, maxLength).concat("...");
}
/**
* try force stop thread
* @param thread thread
* @return stop result
*/
private boolean tryForceStopThread(Thread thread) {
String threadName = thread.getName();
String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD);
if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) {
log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId);
return false;
}
log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName);
try {
thread.stop();
return true;
} catch (Throwable t) {
log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage());
}
return false;
}
}