mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: problem of task process in case of task slice exception. #355
This commit is contained in:
parent
ac8e96508c
commit
8663f3b79f
@ -47,18 +47,24 @@ public class TaskDAOImpl implements TaskDAO {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
|
public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
|
||||||
String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
|
String insertSql = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||||
try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
|
boolean originAutoCommitFlag ;
|
||||||
|
try (Connection conn = connectionFactory.getConnection()) {
|
||||||
for (TaskDO task : tasks) {
|
originAutoCommitFlag = conn.getAutoCommit();
|
||||||
|
conn.setAutoCommit(false);
|
||||||
fillInsertPreparedStatement(task, ps);
|
try ( PreparedStatement ps = conn.prepareStatement(insertSql)) {
|
||||||
ps.addBatch();
|
for (TaskDO task : tasks) {
|
||||||
|
fillInsertPreparedStatement(task, ps);
|
||||||
|
ps.addBatch();
|
||||||
|
}
|
||||||
|
ps.executeBatch();
|
||||||
|
return true;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
conn.rollback();
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
conn.setAutoCommit(originAutoCommitFlag);
|
||||||
}
|
}
|
||||||
|
|
||||||
ps.executeBatch();
|
|
||||||
return true;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,8 @@ import org.junit.jupiter.api.*;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import static tech.powerjob.worker.core.tracker.task.CommonTaskTracker.ROOT_TASK_ID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* H2 数据库持久化测试
|
* H2 数据库持久化测试
|
||||||
*
|
*
|
||||||
@ -63,6 +65,31 @@ public class PersistenceServiceTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBatchSave(){
|
||||||
|
List<TaskDO> taskList = Lists.newLinkedList();
|
||||||
|
long instanceId = 10086L + ThreadLocalRandom.current().nextInt(2);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
TaskDO task = new TaskDO();
|
||||||
|
taskList.add(task);
|
||||||
|
task.setSubInstanceId(instanceId);
|
||||||
|
task.setInstanceId(instanceId);
|
||||||
|
task.setTaskId(ROOT_TASK_ID + "." + i);
|
||||||
|
task.setFailedCnt(0);
|
||||||
|
task.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
|
||||||
|
task.setTaskName("ROOT_TASK");
|
||||||
|
task.setAddress(NetUtils.getLocalHost());
|
||||||
|
task.setLastModifiedTime(System.currentTimeMillis());
|
||||||
|
task.setCreatedTime(System.currentTimeMillis());
|
||||||
|
task.setLastReportTime(System.currentTimeMillis());
|
||||||
|
task.setResult("");
|
||||||
|
}
|
||||||
|
TaskDO firstTask = taskList.get(0);
|
||||||
|
taskList.add(firstTask);
|
||||||
|
taskPersistenceService.batchSave(taskList);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteAllTasks() {
|
public void testDeleteAllTasks() {
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user