From 593ee714b8988a7ba73a90ecc8c7d6911a2c0132 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 17 Apr 2020 19:46:19 +0800 Subject: [PATCH] optimize InstanceDetail & fix some bug --- .DS_Store | Bin 6148 -> 6148 bytes .../kfcfans/common/model/InstanceDetail.java | 22 +++-- .../kfcfans/common/model/SystemMetrics.java | 3 +- .../kfcfans/common/response/AskResponse.java | 39 ++++++-- .../kfcfans/common/utils/JsonUtils.java | 18 ++++ .../oms/server/akka/actors/FriendActor.java | 9 +- .../oms/server/akka/actors/ServerActor.java | 4 +- .../repository/InstanceLogRepository.java | 10 ++- .../oms/server/service/DispatchService.java | 6 +- .../service/instance/InstanceService.java | 4 +- .../timing/InstanceStatusCheckService.java | 13 ++- .../web/controller/InstanceController.java | 10 ++- .../web/controller/SystemInfoController.java | 21 +++-- .../server/web/response/InstanceLogVO.java | 11 +-- .../oms/server/test/RepositoryTest.java | 2 +- oh-my-scheduler-worker-samples/pom.xml | 56 ++++++++++++ .../oms/server/OhMySchedulerConfig.java | 38 ++++++++ .../kfcfans/oms/server/SampleApplication.java | 19 ++++ .../processors/BroadcastProcessorDemo.java | 52 +++++++++++ .../processors/MapReduceProcessorDemo.java | 84 ++++++++++++++++++ .../processors/StandaloneProcessorDemo.java | 30 +++++++ .../src/main/resources/application.properties | 1 + oh-my-scheduler-worker/pom.xml | 1 - .../oms/worker/actors/TaskTrackerActor.java | 11 ++- .../core/processor/sdk/MapProcessor.java | 4 +- .../core/tracker/task/CommonTaskTracker.java | 2 +- .../tracker/task/FrequentTaskTracker.java | 18 +++- .../worker/core/tracker/task/TaskTracker.java | 7 +- .../oms/worker/persistence/TaskDAO.java | 5 ++ .../oms/worker/persistence/TaskDAOImpl.java | 16 ++++ .../persistence/TaskPersistenceService.java | 14 ++- pom.xml | 35 +++++++- 32 files changed, 491 insertions(+), 74 deletions(-) create mode 100644 oh-my-scheduler-worker-samples/pom.xml create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/SampleApplication.java create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java create mode 100644 oh-my-scheduler-worker-samples/src/main/resources/application.properties diff --git a/.DS_Store b/.DS_Store index d136aede8fe136448471e931097de04498d09aea..a7bbb62e0cd78a8b1fac2126e9ef0c866ecfb549 100644 GIT binary patch delta 89 zcmZoMXfc@J&&abeU^g=(&t@JLc_v2F$vSLJjB=CLv&m2HW#!dj&}ArQNMy)mC}7B8 pNM$JY%*jtq%E?b+U|$6tN`3e61G diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java index ed3e4b61..4909b8a9 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java @@ -1,8 +1,11 @@ package com.github.kfcfans.common.model; +import com.github.kfcfans.common.OmsSerializable; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; +import java.util.List; /** * 任务实例的运行详细信息(对外) @@ -11,7 +14,8 @@ import java.io.Serializable; * @since 2020/4/11 */ @Data -public class InstanceDetail implements Serializable { +@NoArgsConstructor +public class InstanceDetail implements OmsSerializable { // 任务整体开始时间 private Long actualTriggerTime; @@ -24,21 +28,27 @@ public class InstanceDetail implements Serializable { // TaskTracker地址 private String taskTrackerAddress; - private Object extra; + // MR或BD任务专用 + private TaskDetail taskDetail; + // 秒级任务专用 + private List subInstanceDetails; // 秒级任务的 extra -> List @Data - public static class SubInstanceDetail implements Serializable { - private long startTime; - private long finishedTime; + @NoArgsConstructor + public static class SubInstanceDetail implements OmsSerializable { + private long subInstanceId; + private String startTime; + private String finishedTime; private String result; private String status; } // MapReduce 和 Broadcast 任务的 extra -> @Data - public static class TaskDetail implements Serializable { + @NoArgsConstructor + public static class TaskDetail implements OmsSerializable { private long totalTaskNum; private long succeedTaskNum; private long failedTaskNum; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java index 761c53bc..8774cc8d 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java @@ -1,5 +1,6 @@ package com.github.kfcfans.common.model; +import com.github.kfcfans.common.OmsSerializable; import lombok.Data; import java.io.Serializable; @@ -11,7 +12,7 @@ import java.io.Serializable; * @since 2020/3/25 */ @Data -public class SystemMetrics implements Serializable, Comparable { +public class SystemMetrics implements OmsSerializable, Comparable { // CPU核心数量 private int cpuProcessors; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java index a08667fc..8600f26f 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java @@ -1,9 +1,8 @@ package com.github.kfcfans.common.response; import com.github.kfcfans.common.OmsSerializable; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; +import com.github.kfcfans.common.utils.JsonUtils; +import lombok.*; /** @@ -16,10 +15,36 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class AskResponse implements OmsSerializable { - private boolean success; - private Object extra; - public AskResponse(boolean success) { - this.success = success; + private boolean success; + + /* + - 使用 Object 会报错:java.lang.ClassCastException: scala.collection.immutable.HashMap cannot be cast to XXX,只能自己序列化反序列化了 + - 嵌套类型(比如 Map),如果B也是个复杂对象,那么反序列化后B的类型为 LinkedHashMap... 处理比较麻烦(转成JSON再转回来) + */ + private byte[] data; + + // 错误信息 + private String message; + + public static AskResponse succeed(Object data) { + AskResponse r = new AskResponse(); + r.success = true; + if (data != null) { + r.data = JsonUtils.toBytes(data); + } + return r; } + + public static AskResponse failed(String msg) { + AskResponse r = new AskResponse(); + r.success = false; + r.message = msg; + return r; + } + + public T getData(Class clz) throws Exception { + return JsonUtils.parseObject(data, clz); + } + } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/JsonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/JsonUtils.java index ff7ca3b2..9b0ca1b4 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/JsonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/JsonUtils.java @@ -1,6 +1,8 @@ package com.github.kfcfans.common.utils; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -13,6 +15,10 @@ public class JsonUtils { private static final ObjectMapper objectMapper = new ObjectMapper(); + static { + + } + public static String toJSONString(Object obj) { try { return objectMapper.writeValueAsString(obj); @@ -21,7 +27,19 @@ public class JsonUtils { return null; } + public static byte[] toBytes(Object obj) { + try { + return objectMapper.writeValueAsBytes(obj); + }catch (Exception ignore) { + } + return null; + } + public static T parseObject(String json, Class clz) throws JsonProcessingException { return objectMapper.readValue(json, clz); } + + public static T parseObject(byte[] b, Class clz) throws Exception { + return objectMapper.readValue(b, clz); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java index 1667fea9..cf9e7bcf 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java @@ -31,10 +31,7 @@ public class FriendActor extends AbstractActor { * 处理存活检测的请求 */ private void onReceivePing(Ping ping) { - AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(true); - askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); - getSender().tell(askResponse, getSelf()); + getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf()); } /** @@ -42,9 +39,7 @@ public class FriendActor extends AbstractActor { */ private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) { Map workerInfo = WorkerManagerService.getActiveWorkerInfo(req.getAppId()); - AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(true); - askResponse.setExtra(workerInfo); + AskResponse askResponse = AskResponse.succeed(workerInfo); getSender().tell(askResponse, getSelf()); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index 0da92ce0..18f8e94e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -46,9 +46,7 @@ public class ServerActor extends AbstractActor { InstanceManager.updateStatus(req); // 回复接收成功 - AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(true); - getSender().tell(askResponse, getSelf()); + getSender().tell(AskResponse.succeed(null), getSelf()); }catch (Exception e) { log.error("[ServerActor] update instance status failed for request: {}.", req, e); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java index 144d2b37..6665b0db 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java @@ -39,8 +39,14 @@ public interface InstanceLogRepository extends JpaRepository jobInfo.getMaxInstanceNum()) { String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount); - instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result); + instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result); return; } @@ -85,7 +85,7 @@ public class DispatchService { if (CollectionUtils.isEmpty(finalWorkers)) { String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription); - instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } @@ -120,6 +120,6 @@ public class DispatchService { log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); // 修改状态 - instanceLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT); + instanceLogRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index 21615b2e..6f0bc701 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -114,9 +114,9 @@ public class InstanceService { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (askResponse.isSuccess()) { - return (InstanceDetail) askResponse.getExtra(); + return askResponse.getData(InstanceDetail.class); }else { - log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getExtra()); + log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getMessage()); } }catch (Exception e) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 7c0c8b0b..4b5fe231 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -106,18 +106,12 @@ public class InstanceStatusCheckService { TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus()); - // 如果任务已关闭,则不进行重试,将任务置为失败即可 - if (jobStatus != JobStatus.ENABLE) { + // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 + if (jobStatus != JobStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { updateFailedInstance(instance); return; } - // 秒级任务,无限重试,直接派发 - if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { - dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); - return; - } - // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) { dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); @@ -134,6 +128,9 @@ public class InstanceStatusCheckService { * 处理上报超时而失败的任务实例 */ private void updateFailedInstance(InstanceLogDO instance) { + + log.warn("[InstanceStatusCheckService] detected instance(instanceId={},jobId={})'s TaskTracker report timeout,this instance is considered a failure.", instance.getInstanceId(), instance.getJobId()); + instance.setStatus(InstanceStatus.FAILED.getV()); instance.setFinishedTime(System.currentTimeMillis()); instance.setGmtModified(new Date()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 44272525..42fa66e8 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -49,8 +49,8 @@ public class InstanceController { } @GetMapping("/status") - public ResultDTO getRunningStatus(Long instanceId) { - return ResultDTO.success(instanceService.getInstanceDetail(instanceId)); + public ResultDTO getRunningStatus(String instanceId) { + return ResultDTO.success(instanceService.getInstanceDetail(Long.valueOf(instanceId))); } @PostMapping("/list") @@ -79,10 +79,14 @@ public class InstanceController { BeanUtils.copyProperties(instanceLogDO, instanceLogVO); // 状态转化为中文 - instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); + instanceLogVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); // 额外设置任务名称,提高可读性 instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); + // ID 转化为 String(JS精度丢失) + instanceLogVO.setJobId(instanceLogDO.getJobId().toString()); + instanceLogVO.setInstanceId(instanceLogDO.getInstanceId().toString()); + // 格式化时间 if (instanceLogDO.getActualTriggerTime() == null) { instanceLogVO.setActualTriggerTime("N/A"); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java index 5a383657..85db6d42 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java @@ -7,6 +7,7 @@ import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.common.utils.JsonUtils; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; @@ -16,6 +17,7 @@ import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.web.response.SystemOverviewVO; import com.github.kfcfans.oms.server.web.response.WorkerStatusVO; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; @@ -34,6 +36,7 @@ import java.util.concurrent.TimeUnit; * @author tjq * @since 2020/4/14 */ +@Slf4j @RestController @RequestMapping("/system") public class SystemInfoController { @@ -46,7 +49,7 @@ public class SystemInfoController { private InstanceLogRepository instanceLogRepository; @GetMapping("/listWorker") - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public ResultDTO> listWorker(Long appId) { Optional appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { @@ -67,16 +70,22 @@ public class SystemInfoController { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (askResponse.isSuccess()) { - Map address2Info = (Map) askResponse.getExtra(); + Map address2Info = askResponse.getData(Map.class); List result = Lists.newLinkedList(); - address2Info.forEach((address, metrics) -> { - WorkerStatusVO info = new WorkerStatusVO(address, metrics); - result.add(info); + address2Info.forEach((address, m) -> { + try { + SystemMetrics metrics = JsonUtils.parseObject(JsonUtils.toJSONString(m), SystemMetrics.class); + WorkerStatusVO info = new WorkerStatusVO(String.valueOf(address), metrics); + result.add(info); + }catch (Exception e) { + e.printStackTrace(); + } }); return ResultDTO.success(result); } - return ResultDTO.failed(String.valueOf(askResponse.getExtra())); + return ResultDTO.failed(askResponse.getMessage()); }catch (Exception e) { + log.error("[SystemInfoController] listWorker for appId:{} failed.", appId, e); return ResultDTO.failed("no worker or server available"); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java index e8de401e..1aa53c2e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java @@ -13,12 +13,12 @@ import java.util.Date; @Data public class InstanceLogVO { - // 任务ID - private Long jobId; + // 任务ID(JS精度丢失) + private String jobId; // 任务名称 private String jobName; - // 任务实例ID - private Long instanceId; + // 任务实例ID(JS精度丢失) + private String instanceId; // 执行结果 private String result; @@ -28,9 +28,10 @@ public class InstanceLogVO { // 总共执行的次数(用于重试判断) private Long runningTimes; + private int status; /* ********** 不一致区域 ********** */ - private String status; + private String statusStr; // 实际触发时间(需要格式化为人看得懂的时间) private String actualTriggerTime; // 结束时间(同理,需要格式化) diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java index 5d4f886b..c16bb17c 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -67,7 +67,7 @@ public class RepositoryTest { @Test public void testExecuteLogUpdate() { - instanceLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL"); + instanceLogRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL"); instanceLogRepository.update4FrequentJob(1586310419650L, 2, 200); } diff --git a/oh-my-scheduler-worker-samples/pom.xml b/oh-my-scheduler-worker-samples/pom.xml new file mode 100644 index 00000000..94cf9530 --- /dev/null +++ b/oh-my-scheduler-worker-samples/pom.xml @@ -0,0 +1,56 @@ + + + + oh-my-scheduler + com.github.kfcfans + 1.0.0 + + 4.0.0 + + oh-my-scheduler-worker-samples + 1.0.0 + + + 2.2.6.RELEASE + 1.0.0 + 1.2.68 + + + + + + + org.springframework.boot + spring-boot-starter-web + ${springboot.version} + + + org.springframework.boot + spring-boot-starter-data-jpa + ${springboot.version} + + + org.springframework.boot + spring-boot-starter-test + ${springboot.version} + test + + + + com.github.kfcfans + oh-my-scheduler-worker + ${oms.worker.version} + + + + com.alibaba + fastjson + ${fastjson.version} + + + + + + \ No newline at end of file diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java new file mode 100644 index 00000000..0914aa60 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java @@ -0,0 +1,38 @@ +package com.github.kfcfans.oms.server; + +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.OhMyConfig; +import com.github.kfcfans.oms.worker.common.constants.StoreStrategy; +import com.google.common.collect.Lists; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +/** + * OMS-Worker 配置 + * + * @author tjq + * @since 2020/4/17 + */ +@Configuration +public class OhMySchedulerConfig { + + @Bean + public OhMyWorker initOMS() throws Exception { + + List serverAddress = Lists.newArrayList("192.168.1.6:7700", "127.0.0.1:7700"); + + // 1. 创建配置文件 + OhMyConfig config = new OhMyConfig(); + config.setAppName("oms-test"); + config.setServerAddress(serverAddress); + config.setStoreStrategy(StoreStrategy.DISK); + + // 2. 创建 Worker 对象,设置配置文件 + OhMyWorker ohMyWorker = new OhMyWorker(); + ohMyWorker.setConfig(config); + return ohMyWorker; + } + +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/SampleApplication.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/SampleApplication.java new file mode 100644 index 00000000..664933ad --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/SampleApplication.java @@ -0,0 +1,19 @@ +package com.github.kfcfans.oms.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * 主类 + * + * @author tjq + * @since 2020/4/17 + */ +@EnableScheduling +@SpringBootApplication +public class SampleApplication { + public static void main(String[] args) { + SpringApplication.run(SampleApplication.class, args); + } +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java new file mode 100644 index 00000000..9bb526c1 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java @@ -0,0 +1,52 @@ +package com.github.kfcfans.oms.server.processors; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.TaskResult; +import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * 广播处理器 示例 + * com.github.kfcfans.oms.server.processors.BroadcastProcessorDemo + * + * @author tjq + * @since 2020/4/17 + */ +@Slf4j +public class BroadcastProcessorDemo extends BroadcastProcessor { + + @Override + public ProcessResult preProcess(TaskContext context) throws Exception { + + System.out.println("================ BroadcastProcessorDemo#preProcess ================"); + System.out.println("TaskContext: " + JSONObject.toJSONString(context)); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } + + @Override + public ProcessResult process(TaskContext context) throws Exception { + System.out.println("================ BroadcastProcessorDemo#process ================"); + System.out.println("TaskContext: " + JSONObject.toJSONString(context)); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } + + @Override + public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception { + + System.out.println("================ BroadcastProcessorDemo#postProcess ================"); + System.out.println("TaskContext: " + JSONObject.toJSONString(context)); + System.out.println("List: " + JSONObject.toJSONString(taskResults)); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java new file mode 100644 index 00000000..43943ff2 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java @@ -0,0 +1,84 @@ +package com.github.kfcfans.oms.server.processors; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.common.utils.JsonUtils; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.TaskResult; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * MapReduce 处理器示例 + * com.github.kfcfans.oms.server.processors.MapReduceProcessorDemo + * + * @author tjq + * @since 2020/4/17 + */ +@Slf4j +public class MapReduceProcessorDemo extends MapReduceProcessor { + + // 每一批发送任务大小 + private static final int batchSize = 100; + // 发送的批次 + private static final int batchNum = 2; + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + System.out.println("============== TestMapReduceProcessor#process =============="); + System.out.println("isRootTask:" + isRootTask()); + System.out.println("taskContext:" + JsonUtils.toJSONString(context)); + + if (isRootTask()) { + System.out.println("==== MAP ===="); + List subTasks = Lists.newLinkedList(); + for (int j = 0; j < batchNum; j++) { + for (int i = 0; i < batchSize; i++) { + int x = j * batchSize + i; + subTasks.add(new TestSubTask("name" + x, x)); + } + ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK"); + System.out.println("mapResult: " + mapResult); + subTasks.clear(); + } + return new ProcessResult(true, "MAP_SUCCESS"); + }else { + System.out.println("==== NORMAL_PROCESS ===="); + System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); + Thread.sleep(1000); + if (context.getCurrentRetryTimes() == 0) { + return new ProcessResult(false, "FIRST_FAILED"); + }else { + return new ProcessResult(true, "PROCESS_SUCCESS"); + } + } + } + + @Override + public ProcessResult reduce(TaskContext context, List taskResults) { + log.info("================ MapReduceProcessorDemo#postProcess ================"); + log.info("TaskContext: {}", JSONObject.toJSONString(context)); + log.info("List: {}", JSONObject.toJSONString(taskResults)); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } + + @Getter + @ToString + @NoArgsConstructor + @AllArgsConstructor + private static class TestSubTask { + private String name; + private int age; + } +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java new file mode 100644 index 00000000..3a9b88dd --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java @@ -0,0 +1,30 @@ +package com.github.kfcfans.oms.server.processors; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * 单机处理器 示例 + * com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo + * + * @author tjq + * @since 2020/4/17 + */ +@Slf4j +public class StandaloneProcessorDemo implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + System.out.println("================ StandaloneProcessorDemo#process ================"); + System.out.println("TaskContext: " + JSONObject.toJSONString(context)); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } +} diff --git a/oh-my-scheduler-worker-samples/src/main/resources/application.properties b/oh-my-scheduler-worker-samples/src/main/resources/application.properties new file mode 100644 index 00000000..bafddced --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/resources/application.properties @@ -0,0 +1 @@ +server.port=8081 \ No newline at end of file diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 3d08c64b..031ba74c 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -84,5 +84,4 @@ - \ No newline at end of file diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 29fb3c05..746e8d79 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -89,7 +89,8 @@ public class TaskTrackerActor extends AbstractActor { log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e); } - AskResponse response = new AskResponse(success); + AskResponse response = new AskResponse(); + response.setSuccess(success); getSender().tell(response, getSelf()); } @@ -151,16 +152,14 @@ public class TaskTrackerActor extends AbstractActor { * 查询任务实例运行状态 */ private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { - AskResponse askResponse = new AskResponse(); + AskResponse askResponse; TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req); - askResponse.setSuccess(false); - askResponse.setExtra("can't find TaskTracker"); + askResponse = AskResponse.failed("can't find TaskTracker"); }else { InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(); - askResponse.setSuccess(true); - askResponse.setExtra(instanceDetail); + askResponse = AskResponse.succeed(instanceDetail); } getSender().tell(askResponse, getSelf()); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapProcessor.java index 567887d0..8a5fc637 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapProcessor.java @@ -44,7 +44,7 @@ public abstract class MapProcessor implements BasicProcessor { } if (taskList.size() > RECOMMEND_BATCH_SIZE) { - log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks."); + log.warn("[MapProcessor] map task size is too large, network maybe overload... please try to split the tasks."); } TaskDO task = ThreadLocalStore.getTask(); @@ -61,7 +61,7 @@ public abstract class MapProcessor implements BasicProcessor { AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); requestSucceed = respObj.isSuccess(); }catch (Exception e) { - log.warn("[MapReduceProcessor] map failed.", e); + log.warn("[MapProcessor] map failed.", e); } if (requestSucceed) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index dff398b2..77faf9ed 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -73,7 +73,7 @@ public class CommonTaskTracker extends TaskTracker { taskDetail.setSucceedTaskNum(holder.succeedNum); taskDetail.setFailedTaskNum(holder.failedNum); taskDetail.setTotalTaskNum(holder.getTotalTaskNum()); - detail.setExtra(taskDetail); + detail.setTaskDetail(taskDetail); return detail; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index d42fd621..780d0c8d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,6 +20,8 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; @@ -111,15 +113,25 @@ public class FrequentTaskTracker extends TaskTracker { detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); List history = Lists.newLinkedList(); - recentSubInstanceInfo.forEach((ignore, subInstanceInfo) -> { + recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> { InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail(); BeanUtils.copyProperties(subInstanceInfo, subDetail); - subDetail.setStatus(InstanceStatus.of(subInstanceInfo.status).getDes()); + InstanceStatus status = InstanceStatus.of(subInstanceInfo.status); + subDetail.setStatus(status.getDes()); + subDetail.setSubInstanceId(subId); + + // 设置时间 + subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), "yyyy-MM-dd HH:mm:ss")); + if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { + subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), "yyyy-MM-dd HH:mm:ss")); + }else { + subDetail.setFinishedTime("N/A"); + } history.add(subDetail); }); - detail.setExtra(history); + detail.setSubInstanceDetails(history); return detail; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index add7dc66..5fe3426f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -183,11 +183,8 @@ public abstract class TaskTracker { } // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(nTaskStatus.getValue()); - updateEntity.setResult(result); - updateEntity.setLastReportTime(reportTime); - boolean updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); + result = result == null ? "" : result; + boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) { log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 49ffbd31..baa295ec 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -39,4 +39,9 @@ public interface TaskDAO { */ List getAllTaskResult(Long instanceId, Long subInstanceId) throws SQLException; + /** + * 更新任务状态(result可能出现千奇百怪的字符,比如 ' ,只能特殊定制SQL直接写入) + */ + boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException; + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 6703ecc1..574d2eab 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -163,6 +163,22 @@ public class TaskDAOImpl implements TaskDAO { return taskResults; } + @Override + public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException { + String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?"; + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + + ps.setInt(1, status); + ps.setLong(2, lastReportTime); + ps.setString(3, result); + ps.setLong(4, lastReportTime); + ps.setLong(5, instanceId); + ps.setString(6, taskId); + ps.executeUpdate(); + return true; + } + } + private static TaskDO convert(ResultSet rs) throws SQLException { TaskDO task = new TaskDO(); task.setTaskId(rs.getString("task_id")); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index f3b00210..006a82c1 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -69,7 +69,7 @@ public class TaskPersistenceService { } /** - * 依靠主键更新 Task + * 依靠主键更新 Task(不涉及 result 的,都可以用该方法更新) */ public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) { try { @@ -82,6 +82,18 @@ public class TaskPersistenceService { return false; } + /** + * 更新任务状态 + */ + public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { + try { + return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result)); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTaskStatus failed.", e); + } + return false; + } + /** * 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行 * update task_info diff --git a/pom.xml b/pom.xml index acda9ac5..5e5cabe2 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ oh-my-scheduler-server oh-my-scheduler-common oh-my-scheduler-client + oh-my-scheduler-worker-samples pom @@ -19,6 +20,8 @@ 1.8 1.8 1.8 + 3.8.1 + 3.2.1 UTF-8 UTF-8 1.18.12 @@ -32,6 +35,36 @@ ${lombok.version} provided - + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + ${java.version} + ${java.version} + + + + + org.apache.maven.plugins + maven-source-plugin + ${maven-source-plugin.version} + + + attach-sources + + jar + + + + + + \ No newline at end of file