diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java index e88b3051..c0252145 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java @@ -17,4 +17,13 @@ public enum ExecuteType { MAP_REDUCE(3); int v; + + public static ExecuteType of(int v) { + for (ExecuteType type : values()) { + if (type.v == v) { + return type; + } + } + throw new IllegalArgumentException("unknown ExecuteType of " + v); + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index 95b79a9c..910a02e5 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -4,7 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; /** - * description + * 任务运行状态 * * @author tjq * @since 2020/3/17 @@ -12,10 +12,14 @@ import lombok.Getter; @Getter @AllArgsConstructor public enum InstanceStatus { - RUNNING(3, "运行中"), - SUCCEED(4, "运行成功"), - FAILED(5, "运行失败"); - private int value; + WAITING_DISPATCH(1, "等待任务派发,任务处理Server时间轮中"), + WAITING_WORKER_RECEIVE(2, "Server已完成任务派发,等待Worker接收"), + RUNNING(3, "Worker接收成功,正在运行任务"), + FAILED(4, "任务运行失败"), + SUCCEED(5, "任务运行成功"), + STOPPED(10, "任务被手动停止"); + + private int v; private String des; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java index 3dc75c0f..46712a14 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ProcessorType.java @@ -17,4 +17,13 @@ public enum ProcessorType { private int v; private String des; + + public static ProcessorType of(int v) { + for (ProcessorType type : values()) { + if (type.v == v) { + return type; + } + } + throw new IllegalArgumentException("unknown ProcessorType of " + v); + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java index 0ce70df7..8abb9d24 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java @@ -32,6 +32,8 @@ public class SystemMetrics implements Serializable, Comparable { // 缓存分数 private int score; + public static final int MIN_SCORE = 1; + @Override public int compareTo(SystemMetrics that) { return this.calculateScore() - that.calculateScore(); @@ -53,7 +55,7 @@ public class SystemMetrics implements Serializable, Comparable { // 最低运行标准,1G磁盘 & 0.5G内存 & 一个可用的CPU核心 if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 1) { - score = 1; + score = MIN_SCORE; } else { // 磁盘只需要满足最低标准即可 score = (int) (availableMemory * 2 + availableCPUCores); diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 86b83140..606b0644 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -3,6 +3,7 @@ package com.github.kfcfans.common.request; import lombok.Data; import java.io.Serializable; +import java.util.List; /** * 服务端调度任务请求(一次任务处理的入口) @@ -13,18 +14,16 @@ import java.io.Serializable; @Data public class ServerScheduleJobReq implements Serializable { - // 调度的服务器地址,默认通讯目标 - private String serverAddress; // 可用处理器地址,可能多值,逗号分隔 - private String allWorkerAddress; + private List allWorkerAddress; /* *********************** 任务相关属性 *********************** */ /** * 基础信息 */ - private String jobId; - private String instanceId; + private Long jobId; + private Long instanceId; /** * 任务执行处理器信息 diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java index 06187179..74d62884 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -13,8 +13,8 @@ import java.io.Serializable; @Data public class TaskTrackerReportInstanceStatusReq implements Serializable { - private String jobId; - private String instanceId; + private Long jobId; + private Long instanceId; private int instanceStatus; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java index 9d88e8a5..786d6361 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java @@ -18,4 +18,8 @@ import java.io.Serializable; public class AskResponse implements Serializable { private boolean success; private Object extra; + + public AskResponse(boolean success) { + this.success = success; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java new file mode 100644 index 00000000..0b2df785 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java @@ -0,0 +1,22 @@ +package com.github.kfcfans.oms.server.common.constans; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 任务状态 + * + * @author tjq + * @since 2020/4/6 + */ +@Getter +@AllArgsConstructor +public enum JobStatus { + + ENABLE(1), + STOPPED(2), + DELETED(99); + + private int v; + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/CronExpression.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/CronExpression.java new file mode 100644 index 00000000..e8f68f74 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/CronExpression.java @@ -0,0 +1,1647 @@ +package com.github.kfcfans.oms.server.common.utils; + +import java.io.Serializable; +import java.text.ParseException; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.SortedSet; +import java.util.StringTokenizer; +import java.util.TimeZone; +import java.util.TreeSet; + +/** + * Provides a parser and evaluator for unix-like cron expressions. Cron + * expressions provide the ability to specify complex time combinations such as + * "At 8:00am every Monday through Friday" or "At 1:30am every + * last Friday of the month". + *

+ * Cron expressions are comprised of 6 required fields and one optional field + * separated by white space. The fields respectively are described as follows: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Field Name Allowed Values Allowed Special Characters
Seconds  + * 0-59  + * , - * /
Minutes  + * 0-59  + * , - * /
Hours  + * 0-23  + * , - * /
Day-of-month  + * 1-31  + * , - * ? / L W
Month  + * 0-11 or JAN-DEC  + * , - * /
Day-of-Week  + * 1-7 or SUN-SAT  + * , - * ? / L #
Year (Optional)  + * empty, 1970-2199  + * , - * /
+ *

+ * The '*' character is used to specify all values. For example, "*" + * in the minute field means "every minute". + *

+ * The '?' character is allowed for the day-of-month and day-of-week fields. It + * is used to specify 'no specific value'. This is useful when you need to + * specify something in one of the two fields, but not the other. + *

+ * The '-' character is used to specify ranges For example "10-12" in + * the hour field means "the hours 10, 11 and 12". + *

+ * The ',' character is used to specify additional values. For example + * "MON,WED,FRI" in the day-of-week field means "the days Monday, + * Wednesday, and Friday". + *

+ * The '/' character is used to specify increments. For example "0/15" + * in the seconds field means "the seconds 0, 15, 30, and 45". And + * "5/15" in the seconds field means "the seconds 5, 20, 35, and + * 50". Specifying '*' before the '/' is equivalent to specifying 0 is + * the value to start with. Essentially, for each field in the expression, there + * is a set of numbers that can be turned on or off. For seconds and minutes, + * the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to + * 31, and for months 0 to 11 (JAN to DEC). The "/" character simply helps you turn + * on every "nth" value in the given set. Thus "7/6" in the + * month field only turns on month "7", it does NOT mean every 6th + * month, please note that subtlety. + *

+ * The 'L' character is allowed for the day-of-month and day-of-week fields. + * This character is short-hand for "last", but it has different + * meaning in each of the two fields. For example, the value "L" in + * the day-of-month field means "the last day of the month" - day 31 + * for January, day 28 for February on non-leap years. If used in the + * day-of-week field by itself, it simply means "7" or + * "SAT". But if used in the day-of-week field after another value, it + * means "the last xxx day of the month" - for example "6L" + * means "the last friday of the month". You can also specify an offset + * from the last day of the month, such as "L-3" which would mean the third-to-last + * day of the calendar month. When using the 'L' option, it is important not to + * specify lists, or ranges of values, as you'll get confusing/unexpected results. + *

+ * The 'W' character is allowed for the day-of-month field. This character + * is used to specify the weekday (Monday-Friday) nearest the given day. As an + * example, if you were to specify "15W" as the value for the + * day-of-month field, the meaning is: "the nearest weekday to the 15th of + * the month". So if the 15th is a Saturday, the trigger will fire on + * Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the + * 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. + * However if you specify "1W" as the value for day-of-month, and the + * 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not + * 'jump' over the boundary of a month's days. The 'W' character can only be + * specified when the day-of-month is a single day, not a range or list of days. + *

+ * The 'L' and 'W' characters can also be combined for the day-of-month + * expression to yield 'LW', which translates to "last weekday of the + * month". + *

+ * The '#' character is allowed for the day-of-week field. This character is + * used to specify "the nth" XXX day of the month. For example, the + * value of "6#3" in the day-of-week field means the third Friday of + * the month (day 6 = Friday and "#3" = the 3rd one in the month). + * Other examples: "2#1" = the first Monday of the month and + * "4#5" = the fifth Wednesday of the month. Note that if you specify + * "#5" and there is not 5 of the given day-of-week in the month, then + * no firing will occur that month. If the '#' character is used, there can + * only be one expression in the day-of-week field ("3#1,6#3" is + * not valid, since there are two expressions). + *

+ * + *

+ * The legal characters and the names of months and days of the week are not + * case sensitive. + * + *

+ * NOTES: + *

    + *
  • Support for specifying both a day-of-week and a day-of-month value is + * not complete (you'll need to use the '?' character in one of these fields). + *
  • + *
  • Overflowing ranges is supported - that is, having a larger number on + * the left hand side than the right. You might do 22-2 to catch 10 o'clock + * at night until 2 o'clock in the morning, or you might have NOV-FEB. It is + * very important to note that overuse of overflowing ranges creates ranges + * that don't make sense and no effort has been made to determine which + * interpretation CronExpression chooses. An example would be + * "0 0 14-6 ? * FRI-MON".
  • + *
+ *

+ * + * + * @author Sharada Jambula, James House + * @author Contributions from Mads Henderson + * @author Refactoring from CronTrigger to CronExpression by Aaron Craven + */ +public final class CronExpression implements Serializable, Cloneable { + + private static final long serialVersionUID = 12423409423L; + + protected static final int SECOND = 0; + protected static final int MINUTE = 1; + protected static final int HOUR = 2; + protected static final int DAY_OF_MONTH = 3; + protected static final int MONTH = 4; + protected static final int DAY_OF_WEEK = 5; + protected static final int YEAR = 6; + protected static final int ALL_SPEC_INT = 99; // '*' + protected static final int NO_SPEC_INT = 98; // '?' + protected static final Integer ALL_SPEC = ALL_SPEC_INT; + protected static final Integer NO_SPEC = NO_SPEC_INT; + + protected static final Map monthMap = new HashMap(20); + protected static final Map dayMap = new HashMap(60); + static { + monthMap.put("JAN", 0); + monthMap.put("FEB", 1); + monthMap.put("MAR", 2); + monthMap.put("APR", 3); + monthMap.put("MAY", 4); + monthMap.put("JUN", 5); + monthMap.put("JUL", 6); + monthMap.put("AUG", 7); + monthMap.put("SEP", 8); + monthMap.put("OCT", 9); + monthMap.put("NOV", 10); + monthMap.put("DEC", 11); + + dayMap.put("SUN", 1); + dayMap.put("MON", 2); + dayMap.put("TUE", 3); + dayMap.put("WED", 4); + dayMap.put("THU", 5); + dayMap.put("FRI", 6); + dayMap.put("SAT", 7); + } + + private final String cronExpression; + private TimeZone timeZone = null; + protected transient TreeSet seconds; + protected transient TreeSet minutes; + protected transient TreeSet hours; + protected transient TreeSet daysOfMonth; + protected transient TreeSet months; + protected transient TreeSet daysOfWeek; + protected transient TreeSet years; + + protected transient boolean lastdayOfWeek = false; + protected transient int nthdayOfWeek = 0; + protected transient boolean lastdayOfMonth = false; + protected transient boolean nearestWeekday = false; + protected transient int lastdayOffset = 0; + protected transient boolean expressionParsed = false; + + public static final int MAX_YEAR = Calendar.getInstance().get(Calendar.YEAR) + 100; + + /** + * Constructs a new CronExpression based on the specified + * parameter. + * + * @param cronExpression String representation of the cron expression the + * new object should represent + * @throws java.text.ParseException + * if the string expression cannot be parsed into a valid + * CronExpression + */ + public CronExpression(String cronExpression) throws ParseException { + if (cronExpression == null) { + throw new IllegalArgumentException("cronExpression cannot be null"); + } + + this.cronExpression = cronExpression.toUpperCase(Locale.US); + + buildExpression(this.cronExpression); + } + + /** + * Constructs a new {@code CronExpression} as a copy of an existing + * instance. + * + * @param expression + * The existing cron expression to be copied + */ + public CronExpression(CronExpression expression) { + /* + * We don't call the other constructor here since we need to swallow the + * ParseException. We also elide some of the sanity checking as it is + * not logically trippable. + */ + this.cronExpression = expression.getCronExpression(); + try { + buildExpression(cronExpression); + } catch (ParseException ex) { + throw new AssertionError(); + } + if (expression.getTimeZone() != null) { + setTimeZone((TimeZone) expression.getTimeZone().clone()); + } + } + + /** + * Indicates whether the given date satisfies the cron expression. Note that + * milliseconds are ignored, so two Dates falling on different milliseconds + * of the same second will always have the same result here. + * + * @param date the date to evaluate + * @return a boolean indicating whether the given date satisfies the cron + * expression + */ + public boolean isSatisfiedBy(Date date) { + Calendar testDateCal = Calendar.getInstance(getTimeZone()); + testDateCal.setTime(date); + testDateCal.set(Calendar.MILLISECOND, 0); + Date originalDate = testDateCal.getTime(); + + testDateCal.add(Calendar.SECOND, -1); + + Date timeAfter = getTimeAfter(testDateCal.getTime()); + + return ((timeAfter != null) && (timeAfter.equals(originalDate))); + } + + /** + * Returns the next date/time after the given date/time which + * satisfies the cron expression. + * + * @param date the date/time at which to begin the search for the next valid + * date/time + * @return the next valid date/time + */ + public Date getNextValidTimeAfter(Date date) { + return getTimeAfter(date); + } + + /** + * Returns the next date/time after the given date/time which does + * not satisfy the expression + * + * @param date the date/time at which to begin the search for the next + * invalid date/time + * @return the next valid date/time + */ + public Date getNextInvalidTimeAfter(Date date) { + long difference = 1000; + + //move back to the nearest second so differences will be accurate + Calendar adjustCal = Calendar.getInstance(getTimeZone()); + adjustCal.setTime(date); + adjustCal.set(Calendar.MILLISECOND, 0); + Date lastDate = adjustCal.getTime(); + + Date newDate; + + //FUTURE_TODO: (QUARTZ-481) IMPROVE THIS! The following is a BAD solution to this problem. Performance will be very bad here, depending on the cron expression. It is, however A solution. + + //keep getting the next included time until it's farther than one second + // apart. At that point, lastDate is the last valid fire time. We return + // the second immediately following it. + while (difference == 1000) { + newDate = getTimeAfter(lastDate); + if(newDate == null) + break; + + difference = newDate.getTime() - lastDate.getTime(); + + if (difference == 1000) { + lastDate = newDate; + } + } + + return new Date(lastDate.getTime() + 1000); + } + + /** + * Returns the time zone for which this CronExpression + * will be resolved. + */ + public TimeZone getTimeZone() { + if (timeZone == null) { + timeZone = TimeZone.getDefault(); + } + + return timeZone; + } + + /** + * Sets the time zone for which this CronExpression + * will be resolved. + */ + public void setTimeZone(TimeZone timeZone) { + this.timeZone = timeZone; + } + + /** + * Returns the string representation of the CronExpression + * + * @return a string representation of the CronExpression + */ + @Override + public String toString() { + return cronExpression; + } + + /** + * Indicates whether the specified cron expression can be parsed into a + * valid cron expression + * + * @param cronExpression the expression to evaluate + * @return a boolean indicating whether the given expression is a valid cron + * expression + */ + public static boolean isValidExpression(String cronExpression) { + + try { + new CronExpression(cronExpression); + } catch (ParseException pe) { + return false; + } + + return true; + } + + public static void validateExpression(String cronExpression) throws ParseException { + + new CronExpression(cronExpression); + } + + + //////////////////////////////////////////////////////////////////////////// + // + // Expression Parsing Functions + // + //////////////////////////////////////////////////////////////////////////// + + protected void buildExpression(String expression) throws ParseException { + expressionParsed = true; + + try { + + if (seconds == null) { + seconds = new TreeSet(); + } + if (minutes == null) { + minutes = new TreeSet(); + } + if (hours == null) { + hours = new TreeSet(); + } + if (daysOfMonth == null) { + daysOfMonth = new TreeSet(); + } + if (months == null) { + months = new TreeSet(); + } + if (daysOfWeek == null) { + daysOfWeek = new TreeSet(); + } + if (years == null) { + years = new TreeSet(); + } + + int exprOn = SECOND; + + StringTokenizer exprsTok = new StringTokenizer(expression, " \t", + false); + + while (exprsTok.hasMoreTokens() && exprOn <= YEAR) { + String expr = exprsTok.nextToken().trim(); + + // throw an exception if L is used with other days of the month + if(exprOn == DAY_OF_MONTH && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' and 'LW' with other days of the month is not implemented", -1); + } + // throw an exception if L is used with other days of the week + if(exprOn == DAY_OF_WEEK && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) { + throw new ParseException("Support for specifying 'L' with other days of the week is not implemented", -1); + } + if(exprOn == DAY_OF_WEEK && expr.indexOf('#') != -1 && expr.indexOf('#', expr.indexOf('#') +1) != -1) { + throw new ParseException("Support for specifying multiple \"nth\" days is not implemented.", -1); + } + + StringTokenizer vTok = new StringTokenizer(expr, ","); + while (vTok.hasMoreTokens()) { + String v = vTok.nextToken(); + storeExpressionVals(0, v, exprOn); + } + + exprOn++; + } + + if (exprOn <= DAY_OF_WEEK) { + throw new ParseException("Unexpected end of expression.", + expression.length()); + } + + if (exprOn <= YEAR) { + storeExpressionVals(0, "*", YEAR); + } + + TreeSet dow = getSet(DAY_OF_WEEK); + TreeSet dom = getSet(DAY_OF_MONTH); + + // Copying the logic from the UnsupportedOperationException below + boolean dayOfMSpec = !dom.contains(NO_SPEC); + boolean dayOfWSpec = !dow.contains(NO_SPEC); + + if (!dayOfMSpec || dayOfWSpec) { + if (!dayOfWSpec || dayOfMSpec) { + throw new ParseException( + "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.", 0); + } + } + } catch (ParseException pe) { + throw pe; + } catch (Exception e) { + throw new ParseException("Illegal cron expression format (" + + e.toString() + ")", 0); + } + } + + protected int storeExpressionVals(int pos, String s, int type) + throws ParseException { + + int incr = 0; + int i = skipWhiteSpace(pos, s); + if (i >= s.length()) { + return i; + } + char c = s.charAt(i); + if ((c >= 'A') && (c <= 'Z') && (!s.equals("L")) && (!s.equals("LW")) && (!s.matches("^L-[0-9]*[W]?"))) { + String sub = s.substring(i, i + 3); + int sval = -1; + int eval = -1; + if (type == MONTH) { + sval = getMonthNumber(sub) + 1; + if (sval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getMonthNumber(sub) + 1; + if (eval <= 0) { + throw new ParseException("Invalid Month value: '" + sub + "'", i); + } + } + } + } else if (type == DAY_OF_WEEK) { + sval = getDayOfWeekNumber(sub); + if (sval < 0) { + throw new ParseException("Invalid Day-of-Week value: '" + + sub + "'", i); + } + if (s.length() > i + 3) { + c = s.charAt(i + 3); + if (c == '-') { + i += 4; + sub = s.substring(i, i + 3); + eval = getDayOfWeekNumber(sub); + if (eval < 0) { + throw new ParseException( + "Invalid Day-of-Week value: '" + sub + + "'", i); + } + } else if (c == '#') { + try { + i += 4; + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + } else if (c == 'L') { + lastdayOfWeek = true; + i++; + } + } + + } else { + throw new ParseException( + "Illegal characters for this position: '" + sub + "'", + i); + } + if (eval != -1) { + incr = 1; + } + addToSet(sval, eval, incr, type); + return (i + 3); + } + + if (c == '?') { + i++; + if ((i + 1) < s.length() + && (s.charAt(i) != ' ' && s.charAt(i + 1) != '\t')) { + throw new ParseException("Illegal character after '?': " + + s.charAt(i), i); + } + if (type != DAY_OF_WEEK && type != DAY_OF_MONTH) { + throw new ParseException( + "'?' can only be specified for Day-of-Month or Day-of-Week.", + i); + } + if (type == DAY_OF_WEEK && !lastdayOfMonth) { + int val = daysOfMonth.last(); + if (val == NO_SPEC_INT) { + throw new ParseException( + "'?' can only be specified for Day-of-Month -OR- Day-of-Week.", + i); + } + } + + addToSet(NO_SPEC_INT, -1, 0, type); + return i; + } + + if (c == '*' || c == '/') { + if (c == '*' && (i + 1) >= s.length()) { + addToSet(ALL_SPEC_INT, -1, incr, type); + return i + 1; + } else if (c == '/' + && ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s + .charAt(i + 1) == '\t')) { + throw new ParseException("'/' must be followed by an integer.", i); + } else if (c == '*') { + i++; + } + c = s.charAt(i); + if (c == '/') { // is an increment specified? + i++; + if (i >= s.length()) { + throw new ParseException("Unexpected end of string.", i); + } + + incr = getNumericValue(s, i); + + i++; + if (incr > 10) { + i++; + } + checkIncrementRange(incr, type, i); + } else { + incr = 1; + } + + addToSet(ALL_SPEC_INT, -1, incr, type); + return i; + } else if (c == 'L') { + i++; + if (type == DAY_OF_MONTH) { + lastdayOfMonth = true; + } + if (type == DAY_OF_WEEK) { + addToSet(7, 7, 0, type); + } + if(type == DAY_OF_MONTH && s.length() > i) { + c = s.charAt(i); + if(c == '-') { + ValueSet vs = getValue(0, s, i+1); + lastdayOffset = vs.value; + if(lastdayOffset > 30) + throw new ParseException("Offset from last day must be <= 30", i+1); + i = vs.pos; + } + if(s.length() > i) { + c = s.charAt(i); + if(c == 'W') { + nearestWeekday = true; + i++; + } + } + } + return i; + } else if (c >= '0' && c <= '9') { + int val = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, -1, -1, type); + } else { + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(val, s, i); + val = vs.value; + i = vs.pos; + } + i = checkNext(i, s, val, type); + return i; + } + } else { + throw new ParseException("Unexpected character: " + c, i); + } + + return i; + } + + private void checkIncrementRange(int incr, int type, int idxPos) throws ParseException { + if (incr > 59 && (type == SECOND || type == MINUTE)) { + throw new ParseException("Increment > 60 : " + incr, idxPos); + } else if (incr > 23 && (type == HOUR)) { + throw new ParseException("Increment > 24 : " + incr, idxPos); + } else if (incr > 31 && (type == DAY_OF_MONTH)) { + throw new ParseException("Increment > 31 : " + incr, idxPos); + } else if (incr > 7 && (type == DAY_OF_WEEK)) { + throw new ParseException("Increment > 7 : " + incr, idxPos); + } else if (incr > 12 && (type == MONTH)) { + throw new ParseException("Increment > 12 : " + incr, idxPos); + } + } + + protected int checkNext(int pos, String s, int val, int type) + throws ParseException { + + int end = -1; + int i = pos; + + if (i >= s.length()) { + addToSet(val, end, -1, type); + return i; + } + + char c = s.charAt(pos); + + if (c == 'L') { + if (type == DAY_OF_WEEK) { + if(val < 1 || val > 7) + throw new ParseException("Day-of-Week values must be between 1 and 7", -1); + lastdayOfWeek = true; + } else { + throw new ParseException("'L' option is not valid here. (pos=" + i + ")", i); + } + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == 'W') { + if (type == DAY_OF_MONTH) { + nearestWeekday = true; + } else { + throw new ParseException("'W' option is not valid here. (pos=" + i + ")", i); + } + if(val > 31) + throw new ParseException("The 'W' option does not make sense with values larger than 31 (max number of days in a month)", i); + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '#') { + if (type != DAY_OF_WEEK) { + throw new ParseException("'#' option is not valid here. (pos=" + i + ")", i); + } + i++; + try { + nthdayOfWeek = Integer.parseInt(s.substring(i)); + if (nthdayOfWeek < 1 || nthdayOfWeek > 5) { + throw new Exception(); + } + } catch (Exception e) { + throw new ParseException( + "A numeric value between 1 and 5 must follow the '#' option", + i); + } + + TreeSet set = getSet(type); + set.add(val); + i++; + return i; + } + + if (c == '-') { + i++; + c = s.charAt(i); + int v = Integer.parseInt(String.valueOf(c)); + end = v; + i++; + if (i >= s.length()) { + addToSet(val, end, 1, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v, s, i); + end = vs.value; + i = vs.pos; + } + if (i < s.length() && ((c = s.charAt(i)) == '/')) { + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + addToSet(val, end, v2, type); + return i; + } + } else { + addToSet(val, end, 1, type); + return i; + } + } + + if (c == '/') { + if ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s.charAt(i + 1) == '\t') { + throw new ParseException("'/' must be followed by an integer.", i); + } + + i++; + c = s.charAt(i); + int v2 = Integer.parseInt(String.valueOf(c)); + i++; + if (i >= s.length()) { + checkIncrementRange(v2, type, i); + addToSet(val, end, v2, type); + return i; + } + c = s.charAt(i); + if (c >= '0' && c <= '9') { + ValueSet vs = getValue(v2, s, i); + int v3 = vs.value; + checkIncrementRange(v3, type, i); + addToSet(val, end, v3, type); + i = vs.pos; + return i; + } else { + throw new ParseException("Unexpected character '" + c + "' after '/'", i); + } + } + + addToSet(val, end, 0, type); + i++; + return i; + } + + public String getCronExpression() { + return cronExpression; + } + + public String getExpressionSummary() { + StringBuilder buf = new StringBuilder(); + + buf.append("seconds: "); + buf.append(getExpressionSetSummary(seconds)); + buf.append("\n"); + buf.append("minutes: "); + buf.append(getExpressionSetSummary(minutes)); + buf.append("\n"); + buf.append("hours: "); + buf.append(getExpressionSetSummary(hours)); + buf.append("\n"); + buf.append("daysOfMonth: "); + buf.append(getExpressionSetSummary(daysOfMonth)); + buf.append("\n"); + buf.append("months: "); + buf.append(getExpressionSetSummary(months)); + buf.append("\n"); + buf.append("daysOfWeek: "); + buf.append(getExpressionSetSummary(daysOfWeek)); + buf.append("\n"); + buf.append("lastdayOfWeek: "); + buf.append(lastdayOfWeek); + buf.append("\n"); + buf.append("nearestWeekday: "); + buf.append(nearestWeekday); + buf.append("\n"); + buf.append("NthDayOfWeek: "); + buf.append(nthdayOfWeek); + buf.append("\n"); + buf.append("lastdayOfMonth: "); + buf.append(lastdayOfMonth); + buf.append("\n"); + buf.append("years: "); + buf.append(getExpressionSetSummary(years)); + buf.append("\n"); + + return buf.toString(); + } + + protected String getExpressionSetSummary(java.util.Set set) { + + if (set.contains(NO_SPEC)) { + return "?"; + } + if (set.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = set.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + protected String getExpressionSetSummary(java.util.ArrayList list) { + + if (list.contains(NO_SPEC)) { + return "?"; + } + if (list.contains(ALL_SPEC)) { + return "*"; + } + + StringBuilder buf = new StringBuilder(); + + Iterator itr = list.iterator(); + boolean first = true; + while (itr.hasNext()) { + Integer iVal = itr.next(); + String val = iVal.toString(); + if (!first) { + buf.append(","); + } + buf.append(val); + first = false; + } + + return buf.toString(); + } + + protected int skipWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) == ' ' || s.charAt(i) == '\t'); i++) { + ; + } + + return i; + } + + protected int findNextWhiteSpace(int i, String s) { + for (; i < s.length() && (s.charAt(i) != ' ' || s.charAt(i) != '\t'); i++) { + ; + } + + return i; + } + + protected void addToSet(int val, int end, int incr, int type) + throws ParseException { + + TreeSet set = getSet(type); + + if (type == SECOND || type == MINUTE) { + if ((val < 0 || val > 59 || end > 59) && (val != ALL_SPEC_INT)) { + throw new ParseException( + "Minute and Second values must be between 0 and 59", + -1); + } + } else if (type == HOUR) { + if ((val < 0 || val > 23 || end > 23) && (val != ALL_SPEC_INT)) { + throw new ParseException( + "Hour values must be between 0 and 23", -1); + } + } else if (type == DAY_OF_MONTH) { + if ((val < 1 || val > 31 || end > 31) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException( + "Day of month values must be between 1 and 31", -1); + } + } else if (type == MONTH) { + if ((val < 1 || val > 12 || end > 12) && (val != ALL_SPEC_INT)) { + throw new ParseException( + "Month values must be between 1 and 12", -1); + } + } else if (type == DAY_OF_WEEK) { + if ((val == 0 || val > 7 || end > 7) && (val != ALL_SPEC_INT) + && (val != NO_SPEC_INT)) { + throw new ParseException( + "Day-of-Week values must be between 1 and 7", -1); + } + } + + if ((incr == 0 || incr == -1) && val != ALL_SPEC_INT) { + if (val != -1) { + set.add(val); + } else { + set.add(NO_SPEC); + } + + return; + } + + int startAt = val; + int stopAt = end; + + if (val == ALL_SPEC_INT && incr <= 0) { + incr = 1; + set.add(ALL_SPEC); // put in a marker, but also fill values + } + + if (type == SECOND || type == MINUTE) { + if (stopAt == -1) { + stopAt = 59; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == HOUR) { + if (stopAt == -1) { + stopAt = 23; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 0; + } + } else if (type == DAY_OF_MONTH) { + if (stopAt == -1) { + stopAt = 31; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == MONTH) { + if (stopAt == -1) { + stopAt = 12; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == DAY_OF_WEEK) { + if (stopAt == -1) { + stopAt = 7; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1; + } + } else if (type == YEAR) { + if (stopAt == -1) { + stopAt = MAX_YEAR; + } + if (startAt == -1 || startAt == ALL_SPEC_INT) { + startAt = 1970; + } + } + + // if the end of the range is before the start, then we need to overflow into + // the next day, month etc. This is done by adding the maximum amount for that + // type, and using modulus max to determine the value being added. + int max = -1; + if (stopAt < startAt) { + switch (type) { + case SECOND : max = 60; break; + case MINUTE : max = 60; break; + case HOUR : max = 24; break; + case MONTH : max = 12; break; + case DAY_OF_WEEK : max = 7; break; + case DAY_OF_MONTH : max = 31; break; + case YEAR : throw new IllegalArgumentException("Start year must be less than stop year"); + default : throw new IllegalArgumentException("Unexpected type encountered"); + } + stopAt += max; + } + + for (int i = startAt; i <= stopAt; i += incr) { + if (max == -1) { + // ie: there's no max to overflow over + set.add(i); + } else { + // take the modulus to get the real value + int i2 = i % max; + + // 1-indexed ranges should not include 0, and should include their max + if (i2 == 0 && (type == MONTH || type == DAY_OF_WEEK || type == DAY_OF_MONTH) ) { + i2 = max; + } + + set.add(i2); + } + } + } + + TreeSet getSet(int type) { + switch (type) { + case SECOND: + return seconds; + case MINUTE: + return minutes; + case HOUR: + return hours; + case DAY_OF_MONTH: + return daysOfMonth; + case MONTH: + return months; + case DAY_OF_WEEK: + return daysOfWeek; + case YEAR: + return years; + default: + return null; + } + } + + protected ValueSet getValue(int v, String s, int i) { + char c = s.charAt(i); + StringBuilder s1 = new StringBuilder(String.valueOf(v)); + while (c >= '0' && c <= '9') { + s1.append(c); + i++; + if (i >= s.length()) { + break; + } + c = s.charAt(i); + } + ValueSet val = new ValueSet(); + + val.pos = (i < s.length()) ? i : i + 1; + val.value = Integer.parseInt(s1.toString()); + return val; + } + + protected int getNumericValue(String s, int i) { + int endOfVal = findNextWhiteSpace(i, s); + String val = s.substring(i, endOfVal); + return Integer.parseInt(val); + } + + protected int getMonthNumber(String s) { + Integer integer = monthMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + protected int getDayOfWeekNumber(String s) { + Integer integer = dayMap.get(s); + + if (integer == null) { + return -1; + } + + return integer; + } + + //////////////////////////////////////////////////////////////////////////// + // + // Computation Functions + // + //////////////////////////////////////////////////////////////////////////// + + public Date getTimeAfter(Date afterTime) { + + // Computation is based on Gregorian year only. + Calendar cl = new java.util.GregorianCalendar(getTimeZone()); + + // move ahead one second, since we're computing the time *after* the + // given time + afterTime = new Date(afterTime.getTime() + 1000); + // CronTrigger does not deal with milliseconds + cl.setTime(afterTime); + cl.set(Calendar.MILLISECOND, 0); + + boolean gotOne = false; + // loop until we've computed the next time, or we've past the endTime + while (!gotOne) { + + //if (endTime != null && cl.getTime().after(endTime)) return null; + if(cl.get(Calendar.YEAR) > 2999) { // prevent endless loop... + return null; + } + + SortedSet st = null; + int t = 0; + + int sec = cl.get(Calendar.SECOND); + int min = cl.get(Calendar.MINUTE); + + // get second................................................. + st = seconds.tailSet(sec); + if (st != null && st.size() != 0) { + sec = st.first(); + } else { + sec = seconds.first(); + min++; + cl.set(Calendar.MINUTE, min); + } + cl.set(Calendar.SECOND, sec); + + min = cl.get(Calendar.MINUTE); + int hr = cl.get(Calendar.HOUR_OF_DAY); + t = -1; + + // get minute................................................. + st = minutes.tailSet(min); + if (st != null && st.size() != 0) { + t = min; + min = st.first(); + } else { + min = minutes.first(); + hr++; + } + if (min != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, min); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.MINUTE, min); + + hr = cl.get(Calendar.HOUR_OF_DAY); + int day = cl.get(Calendar.DAY_OF_MONTH); + t = -1; + + // get hour................................................... + st = hours.tailSet(hr); + if (st != null && st.size() != 0) { + t = hr; + hr = st.first(); + } else { + hr = hours.first(); + day++; + } + if (hr != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + setCalendarHour(cl, hr); + continue; + } + cl.set(Calendar.HOUR_OF_DAY, hr); + + day = cl.get(Calendar.DAY_OF_MONTH); + int mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + t = -1; + int tmon = mon; + + // get day................................................... + boolean dayOfMSpec = !daysOfMonth.contains(NO_SPEC); + boolean dayOfWSpec = !daysOfWeek.contains(NO_SPEC); + if (dayOfMSpec && !dayOfWSpec) { // get day by day of month rule + st = daysOfMonth.tailSet(day); + if (lastdayOfMonth) { + if(!nearestWeekday) { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + if(t > day) { + mon++; + if(mon > 12) { + mon = 1; + tmon = 3333; // ensure test of mon != tmon further below fails + cl.add(Calendar.YEAR, 1); + } + day = 1; + } + } else { + t = day; + day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + day -= lastdayOffset; + + java.util.Calendar tcal = java.util.Calendar.getInstance(getTimeZone()); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + Date nTime = tcal.getTime(); + if(nTime.before(afterTime)) { + day = 1; + mon++; + } + } + } else if(nearestWeekday) { + t = day; + day = daysOfMonth.first(); + + java.util.Calendar tcal = java.util.Calendar.getInstance(getTimeZone()); + tcal.set(Calendar.SECOND, 0); + tcal.set(Calendar.MINUTE, 0); + tcal.set(Calendar.HOUR_OF_DAY, 0); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR)); + + int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + int dow = tcal.get(Calendar.DAY_OF_WEEK); + + if(dow == Calendar.SATURDAY && day == 1) { + day += 2; + } else if(dow == Calendar.SATURDAY) { + day -= 1; + } else if(dow == Calendar.SUNDAY && day == ldom) { + day -= 2; + } else if(dow == Calendar.SUNDAY) { + day += 1; + } + + + tcal.set(Calendar.SECOND, sec); + tcal.set(Calendar.MINUTE, min); + tcal.set(Calendar.HOUR_OF_DAY, hr); + tcal.set(Calendar.DAY_OF_MONTH, day); + tcal.set(Calendar.MONTH, mon - 1); + Date nTime = tcal.getTime(); + if(nTime.before(afterTime)) { + day = daysOfMonth.first(); + mon++; + } + } else if (st != null && st.size() != 0) { + t = day; + day = st.first(); + // make sure we don't over-run a short month, such as february + int lastDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + if (day > lastDay) { + day = daysOfMonth.first(); + mon++; + } + } else { + day = daysOfMonth.first(); + mon++; + } + + if (day != t || mon != tmon) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we + // are 1-based + continue; + } + } else if (dayOfWSpec && !dayOfMSpec) { // get day by day of week rule + if (lastdayOfWeek) { // are we looking for the last XXX day of + // the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // did we already miss the + // last one? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } + + // find date of last occurrence of this day in this month... + while ((day + daysToAdd + 7) <= lDay) { + daysToAdd += 7; + } + + day += daysToAdd; + + if (daysToAdd > 0) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are not promoting the month + continue; + } + + } else if (nthdayOfWeek != 0) { + // are we looking for the Nth XXX day in the month? + int dow = daysOfWeek.first(); // desired + // d-o-w + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } else if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + boolean dayShifted = false; + if (daysToAdd > 0) { + dayShifted = true; + } + + day += daysToAdd; + int weekOfMonth = day / 7; + if (day % 7 > 0) { + weekOfMonth++; + } + + daysToAdd = (nthdayOfWeek - weekOfMonth) * 7; + day += daysToAdd; + if (daysToAdd < 0 + || day > getLastDayOfMonth(mon, cl + .get(Calendar.YEAR))) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0 || dayShifted) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' here because we are NOT promoting the month + continue; + } + } else { + int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w + int dow = daysOfWeek.first(); // desired + // d-o-w + st = daysOfWeek.tailSet(cDow); + if (st != null && st.size() > 0) { + dow = st.first(); + } + + int daysToAdd = 0; + if (cDow < dow) { + daysToAdd = dow - cDow; + } + if (cDow > dow) { + daysToAdd = dow + (7 - cDow); + } + + int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR)); + + if (day + daysToAdd > lDay) { // will we pass the end of + // the month? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon); + // no '- 1' here because we are promoting the month + continue; + } else if (daysToAdd > 0) { // are we swithing days? + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, day + daysToAdd); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, + // and we are 1-based + continue; + } + } + } else { // dayOfWSpec && !dayOfMSpec + throw new UnsupportedOperationException( + "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented."); + } + cl.set(Calendar.DAY_OF_MONTH, day); + + mon = cl.get(Calendar.MONTH) + 1; + // '+ 1' because calendar is 0-based for this field, and we are + // 1-based + int year = cl.get(Calendar.YEAR); + t = -1; + + // test for expressions that never generate a valid fire date, + // but keep looping... + if (year > MAX_YEAR) { + return null; + } + + // get month................................................... + st = months.tailSet(mon); + if (st != null && st.size() != 0) { + t = mon; + mon = st.first(); + } else { + mon = months.first(); + year++; + } + if (mon != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.MONTH, mon - 1); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + + year = cl.get(Calendar.YEAR); + t = -1; + + // get year................................................... + st = years.tailSet(year); + if (st != null && st.size() != 0) { + t = year; + year = st.first(); + } else { + return null; // ran out of years... + } + + if (year != t) { + cl.set(Calendar.SECOND, 0); + cl.set(Calendar.MINUTE, 0); + cl.set(Calendar.HOUR_OF_DAY, 0); + cl.set(Calendar.DAY_OF_MONTH, 1); + cl.set(Calendar.MONTH, 0); + // '- 1' because calendar is 0-based for this field, and we are + // 1-based + cl.set(Calendar.YEAR, year); + continue; + } + cl.set(Calendar.YEAR, year); + + gotOne = true; + } // while( !done ) + + return cl.getTime(); + } + + /** + * Advance the calendar to the particular hour paying particular attention + * to daylight saving problems. + * + * @param cal the calendar to operate on + * @param hour the hour to set + */ + protected void setCalendarHour(Calendar cal, int hour) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour); + if (cal.get(java.util.Calendar.HOUR_OF_DAY) != hour && hour != 24) { + cal.set(java.util.Calendar.HOUR_OF_DAY, hour + 1); + } + } + + /** + * NOT YET IMPLEMENTED: Returns the time before the given time + * that the CronExpression matches. + */ + public Date getTimeBefore(Date endTime) { + // FUTURE_TODO: implement QUARTZ-423 + return null; + } + + /** + * NOT YET IMPLEMENTED: Returns the final time that the + * CronExpression will match. + */ + public Date getFinalFireTime() { + // FUTURE_TODO: implement QUARTZ-423 + return null; + } + + protected boolean isLeapYear(int year) { + return ((year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)); + } + + protected int getLastDayOfMonth(int monthNum, int year) { + + switch (monthNum) { + case 1: + return 31; + case 2: + return (isLeapYear(year)) ? 29 : 28; + case 3: + return 31; + case 4: + return 30; + case 5: + return 31; + case 6: + return 30; + case 7: + return 31; + case 8: + return 31; + case 9: + return 30; + case 10: + return 31; + case 11: + return 30; + case 12: + return 31; + default: + throw new IllegalArgumentException("Illegal month number: " + + monthNum); + } + } + + + private void readObject(java.io.ObjectInputStream stream) + throws java.io.IOException, ClassNotFoundException { + + stream.defaultReadObject(); + try { + buildExpression(cronExpression); + } catch (Exception ignore) { + } // never happens + } + + @Override + @Deprecated + public Object clone() { + return new CronExpression(this); + } +} + +class ValueSet { + public int value; + public int pos; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java new file mode 100644 index 00000000..cf3c3648 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java @@ -0,0 +1,93 @@ +package com.github.kfcfans.oms.server.common.utils.snowflake; + +/** + * Twitter SnowFlake(Scala -> Java) + * + * @author tjq + * @since 2020/4/6 + */ +public class SnowFlakeIdGenerator { + + /** + * 起始的时间戳 + */ + private final static long START_STAMP = 1480166465631L; + + /** + * 每一部分占用的位数 + */ + private final static long SEQUENCE_BIT = 12; //序列号占用的位数 + private final static long MACHINE_BIT = 5; //机器标识占用的位数 + private final static long DATA_CENTER_BIT = 5;//数据中心占用的位数 + + /** + * 每一部分的最大值 + */ + private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT); + private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT); + private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT; + + private long dataCenterId; //数据中心 + private long machineId; //机器标识 + private long sequence = 0L; //序列号 + private long lastTimestamp = -1L;//上一次时间戳 + + public SnowFlakeIdGenerator(long dataCenterId, long machineId) { + if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) { + throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0"); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); + } + this.dataCenterId = dataCenterId; + this.machineId = machineId; + } + + /** + * 产生下一个ID + */ + public synchronized long nextId() { + long currStamp = getNewStamp(); + if (currStamp < lastTimestamp) { + throw new RuntimeException("Clock moved backwards. Refusing to generate id"); + } + + if (currStamp == lastTimestamp) { + //相同毫秒内,序列号自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currStamp = getNextMill(); + } + } else { + //不同毫秒内,序列号置为0 + sequence = 0L; + } + + lastTimestamp = currStamp; + + return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分 + | dataCenterId << DATA_CENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //序列号部分 + } + + private long getNextMill() { + long mill = getNewStamp(); + while (mill <= lastTimestamp) { + mill = getNewStamp(); + } + return mill; + } + + private long getNewStamp() { + return System.currentTimeMillis(); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java index 8eac5e96..97fd7120 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java @@ -26,6 +26,8 @@ public class OhMyServer { @Getter private static String actorSystemAddress; + private static final String AKKA_PATH = "akka://%s@%s/user/%s"; + public static void init() { // 1. 启动 ActorSystem @@ -51,7 +53,12 @@ public class OhMyServer { * @return ActorSelection */ public static ActorSelection getServerActor(String address) { - String path = String.format("akka://%s@%s/user/%s", RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME); + String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME); + return actorSystem.actorSelection(path); + } + + public static ActorSelection getTaskTrackerActor(String address) { + String path = String.format(AKKA_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME); return actorSystem.actorSelection(path); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java index 46c7f3a0..629c2037 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.core.akka; import akka.actor.AbstractActor; +import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; @@ -35,7 +36,19 @@ public class ServerActor extends AbstractActor { getSender().tell(askResponse, getSelf()); } + /** + * 处理 Worker 的心跳请求 + * @param heartbeat 心跳包 + */ private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { WorkerManagerService.updateStatus(heartbeat); } + + /** + * 处理 instance 状态 + * @param req 任务实例的状态上报请求 + */ + private void onReceive(TaskTrackerReportInstanceStatusReq req) { + + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java index 80c71b16..98c5849f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java @@ -23,6 +23,7 @@ public class AppInfoDO { private String appName; private String description; + // 当前负责该 appName 旗下任务调度的server地址,IP:Port private String currentServer; private Date gmtCreate; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java similarity index 58% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java index e06cc63a..d8359052 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.server.persistence.model; +import lombok.Data; + import javax.persistence.*; import java.util.Date; @@ -9,9 +11,10 @@ import java.util.Date; * @author tjq * @since 2020/3/30 */ +@Data @Entity -@Table(name = "job_log") -public class JobLogDO { +@Table(name = "execute_log", indexes = {@Index(columnList = "jobId")}) +public class ExecuteLogDO { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @@ -20,13 +23,19 @@ public class JobLogDO { // 任务ID private Long jobId; // 任务实例ID - private String instanceId; - // 任务状态 运行中/成功/失败... + private Long instanceId; + /** + * 任务状态 {@link com.github.kfcfans.common.InstanceStatus} + */ private int status; // 执行结果 private String result; // 耗时 private Long usedTime; + // 预计触发时间 + private Long expectedTriggerTime; + // 实际触发时间 + private Long actualTriggerTime; private Date gmtCreate; private Date gmtModified; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java index 08065daa..5297441f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java @@ -14,7 +14,7 @@ import java.util.Date; */ @Data @Entity -@Table(name = "job_info") +@Table(name = "job_info", indexes = {@Index(columnList = "appId")}) public class JobInfoDO { @@ -29,6 +29,8 @@ public class JobInfoDO { private String jobDescription; // 任务所属的应用ID private Long appId; + // 任务自带的参数 + private String jobParams; /* ************************** 定时参数 ************************** */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) @@ -45,7 +47,7 @@ public class JobInfoDO { private String processorInfo; /* ************************** 运行时配置 ************************** */ - // 最大同时运行任务数 + // 最大同时运行任务数,默认 1 private Integer maxInstanceNum; // 并发度,同时执行某个任务的最大线程数量 private Integer concurrency; @@ -54,6 +56,10 @@ public class JobInfoDO { // 任务的每一个Task超时时间 private Long taskTimeLimit; + /* ************************** 重试配置 ************************** */ + private Integer instanceRetryNum; + private Integer taskRetryNum; + // 1 正常运行,2 停止(不再调度) private Integer status; // 下一次调度时间 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java index ac4bc0ae..8bd0472e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.List; + /** * AppInfo 数据访问层 * @@ -12,4 +14,10 @@ import org.springframework.data.jpa.repository.JpaRepository; public interface AppInfoRepository extends JpaRepository { AppInfoDO findByAppName(String appName); + + /** + * 根据 currentServer 查询 appId + * 其实只需要 id,处于性能考虑可以直接写SQL只返回ID + */ + List findAllByCurrentServer(String currentServer); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java new file mode 100644 index 00000000..84acd089 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.oms.server.persistence.repository; + +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; + +import java.util.List; + +/** + * JobLog 数据访问层 + * + * @author tjq + * @since 2020/4/1 + */ +public interface ExecuteLogRepository extends JpaRepository { + + long countByJobIdAndStatusIn(long jobId, List status); + + @Query(value = "update execute_log set status = ?2, result = ?3 where instance_id = ?1", nativeQuery = true) + int updateStatusAndLog(long instanceId, int status, String result); + + List findByJobIdIn(List jobIds); +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 1d011e58..00f73962 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -14,7 +14,7 @@ import java.util.List; public interface JobInfoRepository extends JpaRepository { - List findByAppIdInAndNextTriggerTimeLessThanEqual(List appIds, Long time); + List findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); - List findByAppIdAndNextTriggerTimeLessThanEqual(Long appId, Long time); + List findByAppIdInAndStatusAndTimeExpression(List appIds, int status, int timeExpressionType); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java deleted file mode 100644 index f0dd2cdc..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.github.kfcfans.oms.server.persistence.repository; - -import com.github.kfcfans.oms.server.persistence.model.JobLogDO; -import org.springframework.data.jpa.repository.JpaRepository; - -/** - * JobLog 数据访问层 - * - * @author tjq - * @since 2020/4/1 - */ -public interface JobLogRepository extends JpaRepository { - - long countByJobIdAndStatus(Long jobId, Integer status); - -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index ebc3345d..c20bbb3b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -1,10 +1,23 @@ package com.github.kfcfans.oms.server.service; +import akka.actor.ActorSelection; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.oms.server.core.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; -import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.List; + +import static com.github.kfcfans.common.InstanceStatus.*; + /** * 派送服务 @@ -12,16 +25,72 @@ import javax.annotation.Resource; * @author tjq * @since 2020/4/5 */ +@Slf4j @Service public class DispatchService { @Resource - private JobLogRepository jobLogRepository; + private ExecuteLogRepository executeLogRepository; - public void dispatch(JobInfoDO jobInfo) { + // 前三个状态都视为运行中 + private static final List runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()); - // 1. 查询当前运行的实例数 + private static final String FAILED_REASON = "%d instance is running"; + private static final String NO_WORKER_REASON = "no worker available"; + private static final String EMPTY_RESULT = ""; + public void dispatch(JobInfoDO jobInfo, long instanceId) { + log.debug("[DispatchService] start to dispatch job -> {}.", jobInfo); + + // 查询当前运行的实例数 + long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus); + + // 超出最大同时运行限制,不执行调度 + if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { + String result = String.format(FAILED_REASON, runningInstanceCount); + log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount); + executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), result); + + return; + } + + // 获取 Worker + String taskTrackerAddress = WorkerManagerService.chooseBestWorker(jobInfo.getAppId()); + List allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId()); + + if (StringUtils.isEmpty(taskTrackerAddress)) { + log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo); + executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), NO_WORKER_REASON); + return; + } + + // 消除非原子操作带来的潜在不一致 + allAvailableWorker.remove(taskTrackerAddress); + allAvailableWorker.add(taskTrackerAddress); + + // 构造请求 + ServerScheduleJobReq req = new ServerScheduleJobReq(); + req.setAllWorkerAddress(allAvailableWorker); + req.setJobId(jobInfo.getId()); + req.setInstanceId(instanceId); + + req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name()); + req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name()); + req.setProcessorInfo(jobInfo.getProcessorInfo()); + + req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); + req.setTaskTimeoutMS(jobInfo.getTaskTimeLimit()); + + req.setJobParams(jobInfo.getJobParams()); + req.setThreadConcurrency(jobInfo.getConcurrency()); + req.setTaskRetryNum(jobInfo.getTaskRetryNum()); + + // 发送请求(不可靠,需要一个后台线程定期轮询状态) + ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress); + taskTrackerActor.tell(req, null); + + // 修改状态 + executeLogRepository.updateStatusAndLog(instanceId, WAITING_WORKER_RECEIVE.getV(), EMPTY_RESULT); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java new file mode 100644 index 00000000..67ffc416 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.oms.server.service; + +/** + * 唯一ID生成服务 + * + * @author tjq + * @since 2020/4/6 + */ +public class IdGenerateService { + + public static Long allocate() { + // TODO:换成合适的分布式ID生成算法 + return System.currentTimeMillis(); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java index 88680757..0f05ccb4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java @@ -4,10 +4,12 @@ import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Map; +import java.util.Set; /** * 管理Worker集群状态 @@ -61,14 +63,40 @@ public class ClusterStatusHolder { entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore()); for (Map.Entry entry : address2Metrics.entrySet()) { - long lastActiveTime = address2ActiveTime.getOrDefault(entry.getKey(), -1L); - long timeout = System.currentTimeMillis() - lastActiveTime; - if (timeout < WORKER_TIMEOUT_MS) { - return entry.getKey(); + String address = entry.getKey(); + if (available(address)) { + return address; } } log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics); return null; } + + /** + * 获取当前所有可用的 Worker + * @return List + */ + public List getAllAvailableWorker() { + List workers = Lists.newLinkedList(); + + address2Metrics.forEach((address, ignore) -> { + if (available(address)) { + workers.add(address); + } + }); + + return workers; + } + + private boolean available(String address) { + SystemMetrics metrics = address2Metrics.get(address); + if (metrics.calculateScore() == SystemMetrics.MIN_SCORE) { + return false; + } + + Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); + long timeout = System.currentTimeMillis() - lastActiveTime; + return timeout < WORKER_TIMEOUT_MS; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index e02fdcbb..004baa42 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -72,10 +72,7 @@ public class ServerSelectService { try { // 可能上一台机器已经完成了Server选举,需要再次判断 - AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> { - log.error("[ServerSelectService] impossible, unless we just lost our database."); - return null; - }); + AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); if (isActive(appInfo.getCurrentServer())) { return appInfo.getCurrentServer(); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java index 963ad96b..ba818f52 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java @@ -3,10 +3,10 @@ package com.github.kfcfans.oms.server.service.ha; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; -import java.util.List; -import java.util.Map; +import java.util.*; /** * Worker 管理服务 @@ -17,7 +17,8 @@ import java.util.Map; @Slf4j public class WorkerManagerService { - private static final Map appName2ClusterStatus = Maps.newConcurrentMap(); + // 存储Worker健康信息,appId -> ClusterStatusHolder + private static final Map appId2ClusterStatus = Maps.newConcurrentMap(); /** * 更新状态 @@ -26,7 +27,7 @@ public class WorkerManagerService { public static void updateStatus(WorkerHeartbeat heartbeat) { Long appId = heartbeat.getAppId(); String appName = heartbeat.getAppName(); - ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); + ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); clusterStatusHolder.updateStatus(heartbeat); } @@ -36,7 +37,7 @@ public class WorkerManagerService { * @return Worker的地址(null代表没有可用的Worker) */ public static String chooseBestWorker(Long appId) { - ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.get(appId); + ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); return null; @@ -45,11 +46,24 @@ public class WorkerManagerService { } /** - * 获取当前该 Server 管理的所有应用ID - * @return List + * 获取当前所有可用的Worker地址 */ - public static List listAppIds() { - return Lists.newArrayList(appName2ClusterStatus.keySet()); + public static List getAllAvailableWorker(Long appId) { + ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); + if (clusterStatusHolder == null) { + log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); + return Collections.emptyList(); + } + return clusterStatusHolder.getAllAvailableWorker(); + } + + /** + * 清理不需要的worker信息 + * @param usingAppIds 需要维护的appId,其余的数据将被删除 + */ + public static void clean(List usingAppIds) { + Set keys = Sets.newHashSet(usingAppIds); + appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java index a29247a7..a9692233 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java @@ -1,19 +1,34 @@ package com.github.kfcfans.oms.server.service.schedule; -import com.github.kfcfans.common.utils.CommonUtils; +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; +import com.github.kfcfans.oms.server.common.utils.CronExpression; +import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; -import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.service.DispatchService; +import com.github.kfcfans.oms.server.service.IdGenerateService; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; -import com.github.kfcfans.oms.server.service.lock.LockService; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -29,46 +44,163 @@ public class JobScheduleService { private static final int MAX_BATCH_NUM = 10; @Resource - private LockService lockService; + private DispatchService dispatchService; + + @Resource + private AppInfoRepository appInfoRepository; @Resource private JobInfoRepository jobInfoRepository; @Resource - private JobLogRepository jobLogRepository; + private ExecuteLogRepository executeLogRepository; - private static final String SCHEDULE_LOCK = "schedule_lock_%d"; - private static final long SCHEDULE_RATE = 10000; + private static final long SCHEDULE_RATE = 5000; @Scheduled(fixedRate = SCHEDULE_RATE) - private void getJob() { - List allAppIds = WorkerManagerService.listAppIds(); - if (CollectionUtils.isEmpty(allAppIds)) { + public void timingSchedule() { + + Stopwatch stopwatch = Stopwatch.createStarted(); + + // 先查询DB,查看本机需要负责的任务 + List allAppInfos = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress()); + if (CollectionUtils.isEmpty(allAppInfos)) { log.info("[JobScheduleService] current server has no app's job to schedule."); return; } + List allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList()); - long timeThreshold = System.currentTimeMillis() + 2 * SCHEDULE_RATE; - Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> { + try { + scheduleCornJob(allAppIds); + }catch (Exception e) { + log.error("[JobScheduleService] schedule cron job failed.", e); + } + log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch); + stopwatch.reset(); - List lockNames = partAppIds.stream().map(JobScheduleService::genLock).collect(Collectors.toList()); - // 1. 先批量获取锁,获取不到就改成单个循环模式 - boolean batchLock = lockService.batchLock(lockNames); - if (!batchLock) { + try { + scheduleFrequentJob(allAppIds); + }catch (Exception e) { + log.error("[JobScheduleService] schedule frequent job failed.", e); + } + log.info("[JobScheduleService] finished frequent schedule, using time {}.", stopwatch); + stopwatch.stop(); + } - }else { - try { - List jobInfos = jobInfoRepository.findByAppIdInAndNextTriggerTimeLessThanEqual(partAppIds, timeThreshold); + /** + * 调度 CRON 表达式类型的任务 + */ + private void scheduleCornJob(List appIds) { - // 顺序:先推入进时间轮 -> 写jobLog表 -> 更新nextTriggerTime(原则:宁可重复执行,也不能不调度) + // 清理不需要维护的数据 + WorkerManagerService.clean(appIds); + + long nowTime = System.currentTimeMillis(); + long timeThreshold = nowTime + 2 * SCHEDULE_RATE; + Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { + + try { + + // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 + List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + + // 1. 批量写日志表 + Map jobId2InstanceId = Maps.newHashMap(); + List executeLogs = Lists.newLinkedList(); + jobInfos.forEach(jobInfoDO -> { + + ExecuteLogDO executeLog = new ExecuteLogDO(); + executeLog.setJobId(jobInfoDO.getId()); + executeLog.setInstanceId(IdGenerateService.allocate()); + executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime()); + executeLog.setGmtCreate(new Date()); + executeLog.setGmtModified(executeLog.getGmtCreate()); + + executeLogs.add(executeLog); + + jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId()); + }); + executeLogRepository.saveAll(executeLogs); + executeLogRepository.flush(); + + // 2. 推入时间轮中等待调度执行 + jobInfos.forEach(jobInfoDO -> { + + long targetTriggerTime = jobInfoDO.getNextTriggerTime(); + long delay = 0; + if (targetTriggerTime < nowTime) { + log.warn("[JobScheduleService] Job({}) was delayed.", jobInfoDO); + }else { + delay = targetTriggerTime - nowTime; + } + + HashedWheelTimerHolder.TIMER.schedule(() -> { + dispatchService.dispatch(jobInfoDO, jobId2InstanceId.get(jobInfoDO.getId())); + }, delay, TimeUnit.MILLISECONDS); + + }); + + // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) + Date now = new Date(); + List updatedJobInfos = Lists.newLinkedList(); + jobInfos.forEach(jobInfoDO -> { + + try { + CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); + Date nextTriggerTime = cronExpression.getNextValidTimeAfter(now); + + JobInfoDO updatedJobInfo = new JobInfoDO(); + BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); + updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); + updatedJobInfo.setGmtModified(now); + + updatedJobInfos.add(updatedJobInfo); + } catch (Exception e) { + log.error("[JobScheduleService] calculate next trigger time for job({}) failed.", jobInfoDO, e); + } + }); + jobInfoRepository.saveAll(updatedJobInfos); + jobInfoRepository.flush(); - }catch (Exception e) { - - } + }catch (Exception e) { + log.error("[JobScheduleService] schedule job failed.", e); } }); } - private static String genLock(Long appId) { - return String.format(SCHEDULE_LOCK, appId); + /** + * 调度 FIX_RATE 和 FIX_DELAY 的任务 + */ + private void scheduleFrequentJob(List appIds) { + + List fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV()); + List fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV()); + + List jobIds = Lists.newLinkedList(); + Map jobId2JobInfo = Maps.newHashMap(); + Consumer consumer = jobInfo -> { + jobIds.add(jobInfo.getId()); + jobId2JobInfo.put(jobInfo.getId(), jobInfo); + }; + fixDelayJobs.forEach(consumer); + fixRateJobs.forEach(consumer); + + if (CollectionUtils.isEmpty(jobIds)) { + log.debug("[JobScheduleService] no frequent job need to schedule."); + return; + } + + // 查询 ExecuteLog 表,不存在或非运行状态则重新调度 + List executeLogDOS = executeLogRepository.findByJobIdIn(jobIds); + executeLogDOS.forEach(executeLogDO -> { + if (executeLogDO.getStatus() == InstanceStatus.RUNNING.getV()) { + jobId2JobInfo.remove(executeLogDO.getJobId()); + } + }); + + // 重新Dispatch + jobId2JobInfo.values().forEach(jobInfoDO -> { + + }); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index a0ce3dce..9dcb503a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; +import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.web.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; @@ -14,6 +15,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.Date; /** * 任务信息管理 Controller @@ -29,15 +31,22 @@ public class JobController { private JobInfoRepository jobInfoRepository; @PostMapping("/save") - public ResultDTO saveJobInfo(ModifyJobInfoRequest request) { + public ResultDTO saveJobInfo(ModifyJobInfoRequest request) throws Exception { JobInfoDO jobInfoDO = new JobInfoDO(); BeanUtils.copyProperties(request, jobInfoDO); // 拷贝枚举值 + TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpression()); jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV()); jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV()); - jobInfoDO.setTimeExpressionType(TimeExpressionType.valueOf(request.getTimeExpression()).getV()); + jobInfoDO.setTimeExpressionType(timeExpressionType.getV()); + // 计算下次调度时间 + if (timeExpressionType == TimeExpressionType.CRON) { + CronExpression cronExpression = new CronExpression(request.getTimeExpression()); + Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); + jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); + } jobInfoRepository.saveAndFlush(jobInfoDO); return ResultDTO.success(null); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java index 8499d265..42ffd3d9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java @@ -11,6 +11,8 @@ import lombok.Data; @Data public class ModifyJobInfoRequest { + // null -> 插入,否则为更新 + private Long id; /* ************************** 任务基本信息 ************************** */ // 任务名称 private String jobName; @@ -20,6 +22,8 @@ public class ModifyJobInfoRequest { private Long appId; // 任务分组名称(仅用于前端展示的分组) private String groupName; + // 任务自带的参数 + private String jobParams; /* ************************** 定时参数 ************************** */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) @@ -36,6 +40,7 @@ public class ModifyJobInfoRequest { // 执行器信息 private String processorInfo; + /* ************************** 运行时配置 ************************** */ // 最大同时运行任务数 private Integer maxInstanceNum; @@ -45,4 +50,11 @@ public class ModifyJobInfoRequest { private Long instanceTimeLimit; // 任务的每一个Task超时时间 private Long taskTimeLimit; + + /* ************************** 重试配置 ************************** */ + private Integer instanceRetryNum; + private Integer taskRetryNum; + + // 1 正常运行,2 停止(不再调度) + private Integer status; } diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java index eeb6dcfc..616cf745 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java @@ -61,4 +61,9 @@ public class UtilsTest { Thread.sleep(277777777); } + @Test + public void testCronExpression() { + + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java index 440324d3..871569ba 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java @@ -30,8 +30,8 @@ public class ProcessorTrackerActor extends AbstractActor { * 处理来自TaskTracker的task执行请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { - String jobId = req.getInstanceInfo().getJobId(); - String instanceId = req.getInstanceInfo().getInstanceId(); + Long jobId = req.getInstanceInfo().getJobId(); + Long instanceId = req.getInstanceInfo().getInstanceId(); ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> { ProcessorTracker pt = new ProcessorTracker(req); log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId); @@ -50,7 +50,7 @@ public class ProcessorTrackerActor extends AbstractActor { private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { - String instanceId = req.getInstanceId(); + Long instanceId = req.getInstanceId(); ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId); if (processorTracker == null) { log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 1dde9355..9d314592 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -126,7 +126,7 @@ public class TaskTrackerActor extends AbstractActor { * 服务器任务调度处理器 */ private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { - String instanceId = req.getInstanceId(); + Long instanceId = req.getInstanceId(); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId); if (taskTracker != null) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java deleted file mode 100644 index 41ee4427..00000000 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.github.kfcfans.oms.worker.common.constants; - -import com.google.common.base.Splitter; - -/** - * splitter & joiner - * - * @author tjq - * @since 2020/3/17 - */ -public class CommonSJ { - public static final Splitter commaSplitter = Splitter.on(","); -} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 7d2d5995..38d668d8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -44,7 +44,7 @@ public class ProcessorRunnable implements Runnable { public void innerRun() { String taskId = task.getTaskId(); - String instanceId = task.getInstanceId(); + Long instanceId = task.getInstanceId(); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java index 24f5fa37..c252f4a6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -1,6 +1,5 @@ package com.github.kfcfans.oms.worker.core.ha; -import com.github.kfcfans.oms.worker.common.constants.CommonSJ; import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -18,12 +17,10 @@ public class ProcessorTrackerStatusHolder { private final Map ip2Status; - public ProcessorTrackerStatusHolder(String allWorkerAddress) { + public ProcessorTrackerStatusHolder(List allWorkerAddress) { ip2Status = Maps.newConcurrentMap(); - - List addressList = CommonSJ.commaSplitter.splitToList(allWorkerAddress); - addressList.forEach(ip -> { + allWorkerAddress.forEach(ip -> { ProcessorTrackerStatus pts = new ProcessorTrackerStatus(); pts.init(ip); ip2Status.put(ip, pts); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index e5d59b6d..4e25c7c2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -33,7 +33,7 @@ public class ProcessorTracker { // 任务实例信息 private InstanceInfo instanceInfo; // 冗余 instanceId,方便日志 - private String instanceId; + private Long instanceId; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java index cc6d6bf5..e8b047c2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java @@ -14,23 +14,23 @@ import java.util.function.Function; */ public class ProcessorTrackerPool { - private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap(); + private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap(); /** * 获取 ProcessorTracker,如果不存在则创建 */ - public static ProcessorTracker getProcessorTracker(String instanceId, Function creator) { + public static ProcessorTracker getProcessorTracker(Long instanceId, Function creator) { return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator); } /** * 获取 ProcessorTracker */ - public static ProcessorTracker getProcessorTracker(String instanceId) { + public static ProcessorTracker getProcessorTracker(Long instanceId) { return instanceId2ProcessorTracker.get(instanceId); } - public static void removeProcessorTracker(String instanceId) { + public static void removeProcessorTracker(Long instanceId) { instanceId2ProcessorTracker.remove(instanceId); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 96a39e28..b35e56f6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -92,7 +92,7 @@ public class TaskTracker { * 更新任务状态 * 任务状态机只允许数字递增 */ - public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result) { + public void updateTaskStatus(Long instanceId, String taskId, int newStatus, @Nullable String result) { boolean updateResult; TaskStatus nTaskStatus = TaskStatus.of(newStatus); @@ -217,7 +217,7 @@ public class TaskTracker { CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow()); // 1. 通知 ProcessorTracker 释放资源 - String instanceId = instanceInfo.getInstanceId(); + Long instanceId = instanceInfo.getInstanceId(); TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { @@ -257,7 +257,7 @@ public class TaskTracker { } Stopwatch stopwatch = Stopwatch.createStarted(); - String instanceId = instanceInfo.getInstanceId(); + Long instanceId = instanceInfo.getInstanceId(); // 1. 获取可以派发任务的 ProcessorTracker List availablePtIps = ptStatusHolder.getAvailableProcessorTrackers(); @@ -323,7 +323,7 @@ public class TaskTracker { private void innerRun() { - final String instanceId = instanceInfo.getInstanceId(); + Long instanceId = instanceInfo.getInstanceId(); // 1. 查询统计信息 Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId); @@ -396,7 +396,7 @@ public class TaskTracker { boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); req.setResult(resultTask.getResult()); - req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getValue() : InstanceStatus.FAILED.getValue()); + req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); @@ -422,7 +422,7 @@ public class TaskTracker { } // 4. 未完成,上报状态 - req.setInstanceStatus(InstanceStatus.RUNNING.getValue()); + req.setInstanceStatus(InstanceStatus.RUNNING.getV()); serverActor.tell(req, null); // 5.1 定期检查 -> 重试派发后未确认的任务 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java index e5296da6..a112ba2d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java @@ -13,20 +13,20 @@ import java.util.function.Function; */ public class TaskTrackerPool { - private static final Map instanceId2TaskTracker = Maps.newConcurrentMap(); + private static final Map instanceId2TaskTracker = Maps.newConcurrentMap(); /** * 获取 ProcessorTracker,如果不存在则创建 */ - public static TaskTracker getTaskTrackerPool(String instanceId) { + public static TaskTracker getTaskTrackerPool(Long instanceId) { return instanceId2TaskTracker.get(instanceId); } - public static void remove(String instanceId) { + public static void remove(Long instanceId) { instanceId2TaskTracker.remove(instanceId); } - public static void atomicCreateTaskTracker(String instanceId, Function creator) { + public static void atomicCreateTaskTracker(Long instanceId, Function creator) { instanceId2TaskTracker.computeIfAbsent(instanceId, creator); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java index 89afb841..85d46bae 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java @@ -15,8 +15,8 @@ public class SimpleTaskQuery { private static final String LINK = " and "; private String taskId; - private String jobId; - private String instanceId; + private Long jobId; + private Long instanceId; private String taskName; private String address; private Integer status; @@ -37,10 +37,10 @@ public class SimpleTaskQuery { sb.append("task_id = '").append(taskId).append("'").append(LINK); } if (!StringUtils.isEmpty(jobId)) { - sb.append("job_id = '").append(jobId).append("'").append(LINK); + sb.append("job_id = ").append(jobId).append(LINK); } if (!StringUtils.isEmpty(instanceId)) { - sb.append("instance_id = '").append(instanceId).append("'").append(LINK); + sb.append("instance_id = ").append(instanceId).append(LINK); } if (!StringUtils.isEmpty(address)) { sb.append("address = '").append(address).append("'").append(LINK); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 88f99d43..047582c5 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -35,6 +35,6 @@ public interface TaskDAO { /** * 查询 taskId -> taskResult (为了性能特殊定制,主要是内存占用,如果使用 simpleQueryPlus,内存中需要同时存在3份数据 ?是同时存在3份数据吗) */ - Map queryTaskId2TaskResult(String instanceId) throws SQLException; + Map queryTaskId2TaskResult(Long instanceId) throws SQLException; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 4b660542..7a80ef4d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -22,7 +22,7 @@ public class TaskDAOImpl implements TaskDAO { public void initTable() throws Exception { String delTableSQL = "drop table if exists task_info"; - String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))"; + String createTableSQL = "create table task_info (task_id varchar(20), instance_id bigint(20), job_id bigint(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))"; try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); @@ -130,12 +130,12 @@ public class TaskDAOImpl implements TaskDAO { } @Override - public Map queryTaskId2TaskResult(String instanceId) throws SQLException { + public Map queryTaskId2TaskResult(Long instanceId) throws SQLException { ResultSet rs = null; Map taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096); String sql = "select task_id, result from task_info where instance_id = ?"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { - ps.setString(1, instanceId); + ps.setLong(1, instanceId); rs = ps.executeQuery(); while (rs.next()) { taskId2Result.put(rs.getString("task_id"), rs.getString("result")); @@ -154,8 +154,8 @@ public class TaskDAOImpl implements TaskDAO { private static TaskDO convert(ResultSet rs) throws SQLException { TaskDO task = new TaskDO(); task.setTaskId(rs.getString("task_id")); - task.setInstanceId(rs.getString("instance_id")); - task.setJobId(rs.getString("job_id")); + task.setInstanceId(rs.getLong("instance_id")); + task.setJobId(rs.getLong("job_id")); task.setTaskName(rs.getString("task_name")); task.setTaskContent(rs.getBytes("task_content")); task.setAddress(rs.getString("address")); @@ -169,8 +169,8 @@ public class TaskDAOImpl implements TaskDAO { private static void fillInsertPreparedStatement(TaskDO task, PreparedStatement ps) throws SQLException { ps.setString(1, task.getTaskId()); - ps.setString(2, task.getInstanceId()); - ps.setString(3, task.getJobId()); + ps.setLong(2, task.getInstanceId()); + ps.setLong(3, task.getJobId()); ps.setString(4, task.getTaskName()); ps.setBytes(5, task.getTaskContent()); ps.setString(6, task.getAddress()); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java index f626a389..3b6d2560 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -19,8 +19,8 @@ public class TaskDO { // 层次命名法,可以表示 Map 后的父子关系,如 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task private String taskId; - private String jobId; - private String instanceId; + private Long jobId; + private Long instanceId; // 任务名称 private String taskName; // 任务对象(序列化后的二进制数据) diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 4c41de00..551aa88c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -70,7 +70,7 @@ public class TaskPersistenceService { /** * 依靠主键更新 Task */ - public boolean updateTask(String instanceId, String taskId, TaskDO updateEntity) { + public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) { try { updateEntity.setLastModifiedTime(System.currentTimeMillis()); SimpleTaskQuery query = genKeyQuery(instanceId, taskId); @@ -110,7 +110,7 @@ public class TaskPersistenceService { /** * 获取 MapReduce 或 Broadcast 的最后一个任务 */ - public Optional getLastTask(String instanceId) { + public Optional getLastTask(Long instanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); @@ -130,7 +130,7 @@ public class TaskPersistenceService { return Optional.empty(); } - public List getAllTask(String instanceId) { + public List getAllTask(Long instanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); @@ -146,7 +146,7 @@ public class TaskPersistenceService { /** * 获取指定状态的Task */ - public List getTaskByStatus(String instanceId, TaskStatus status, int limit) { + public List getTaskByStatus(Long instanceId, TaskStatus status, int limit) { try { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); @@ -163,7 +163,7 @@ public class TaskPersistenceService { * 获取 TaskTracker 管理的子 task 状态统计信息 * TaskStatus -> num */ - public Map getTaskStatusStatistics(String instanceId) { + public Map getTaskStatusStatistics(Long instanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); @@ -191,7 +191,7 @@ public class TaskPersistenceService { /** * 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用 */ - public Map getTaskId2ResultMap(String instanceId) { + public Map getTaskId2ResultMap(Long instanceId) { try { return execute(() -> taskDAO.queryTaskId2TaskResult(instanceId)); }catch (Exception e) { @@ -203,7 +203,7 @@ public class TaskPersistenceService { /** * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,效果惊人...磁盘I/O果然是重要瓶颈...) */ - public Optional getTaskStatus(String instanceId, String taskId) { + public Optional getTaskStatus(Long instanceId, String taskId) { try { SimpleTaskQuery query = genKeyQuery(instanceId, taskId); @@ -221,7 +221,7 @@ public class TaskPersistenceService { /** * 查询任务失败数量(只查询 failed_cnt,节约 I/O 资源) */ - public Optional getTaskFailedCnt(String instanceId, String taskId) { + public Optional getTaskFailedCnt(Long instanceId, String taskId) { try { SimpleTaskQuery query = genKeyQuery(instanceId, taskId); @@ -241,7 +241,7 @@ public class TaskPersistenceService { /** * 批量更新 Task 状态 */ - public boolean batchUpdateTaskStatus(String instanceId, List taskIds, TaskStatus status, String result) { + public boolean batchUpdateTaskStatus(Long instanceId, List taskIds, TaskStatus status, String result) { try { return execute(() -> { @@ -262,7 +262,7 @@ public class TaskPersistenceService { } - public boolean deleteAllTasks(String instanceId) { + public boolean deleteAllTasks(Long instanceId) { try { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); @@ -286,7 +286,7 @@ public class TaskPersistenceService { return Collections.emptyList(); } - private static SimpleTaskQuery genKeyQuery(String instanceId, String taskId) { + private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); condition.setTaskId(taskId); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java index 0a7c45ad..5fa0f504 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java @@ -16,8 +16,8 @@ public class InstanceInfo implements Serializable { /** * 基础信息 */ - private String jobId; - private String instanceId; + private Long jobId; + private Long instanceId; /** * 任务执行处理器信息 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java index 935338ea..082c83c6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java @@ -13,7 +13,7 @@ import java.io.Serializable; @Data public class BroadcastTaskPreExecuteFinishedReq implements Serializable { - private String instanceId; + private Long instanceId; private String taskId; private boolean success; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java index f7cda7de..8fb2d595 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java @@ -21,7 +21,7 @@ import java.util.List; @NoArgsConstructor public class ProcessorMapTaskRequest implements Serializable { - private String instanceId; + private Long instanceId; private String taskName; private List subTasks; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java index 9519d9a8..7a2a743c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java @@ -13,7 +13,7 @@ import java.io.Serializable; @Data public class ProcessorReportTaskStatusReq implements Serializable { - private String instanceId; + private Long instanceId; private String taskId; private int status; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java index 7dc8970e..b0d4c41f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java @@ -14,7 +14,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class ProcessorTrackerStatusReportReq { - private String instanceId; + private Long instanceId; /** * 请求发起时间 @@ -31,7 +31,7 @@ public class ProcessorTrackerStatusReportReq { */ private String ip; - public ProcessorTrackerStatusReportReq(String instanceId, long remainTaskNum) { + public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) { this.instanceId = instanceId; this.remainTaskNum = remainTaskNum; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java index 56e0fde3..f60e1c5c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java @@ -14,7 +14,7 @@ import java.io.Serializable; @Data public class TaskTrackerStopInstanceReq implements Serializable { - private String instanceId; + private Long instanceId; // 保留字段,暂时没用 private String type; } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java index 18578b1f..b1439ae0 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java @@ -29,8 +29,8 @@ public class PersistenceServiceTest { TaskDO task = new TaskDO(); taskList.add(task); - task.setJobId("1"); - task.setInstanceId("10086" + ThreadLocalRandom.current().nextInt(2)); + task.setJobId(1L); + task.setInstanceId(10086L + ThreadLocalRandom.current().nextInt(2)); task.setTaskId(i + ""); task.setFailedCnt(0); task.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); @@ -63,7 +63,7 @@ public class PersistenceServiceTest { public void testDeleteAllTasks() { System.out.println("=============== testBatchDelete ==============="); - boolean delete = taskPersistenceService.deleteAllTasks("100860"); + boolean delete = taskPersistenceService.deleteAllTasks(100860L); System.out.println("delete result:" + delete); } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java index 8e6869ae..6a51843b 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -64,8 +64,8 @@ public class ProcessorTrackerTest { InstanceInfo instanceInfo = new InstanceInfo(); - instanceInfo.setJobId("1"); - instanceInfo.setInstanceId("10086"); + instanceInfo.setJobId(1L); + instanceInfo.setInstanceId(10086L); instanceInfo.setExecuteType(ExecuteType.STANDALONE.name()); instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java index 4e7bf8c6..c400733c 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -10,6 +10,7 @@ import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.common.utils.NetUtils; +import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -54,9 +55,9 @@ public class TaskTrackerTest { private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) { ServerScheduleJobReq req = new ServerScheduleJobReq(); - req.setJobId("1"); - req.setInstanceId("10086"); - req.setAllWorkerAddress(NetUtils.getLocalHost()); + req.setJobId(1L); + req.setInstanceId(10086L); + req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost())); req.setJobParams("this is job Params"); req.setInstanceParams("this is instance Params");