+ * Cron expressions are comprised of 6 required fields and one optional field
+ * separated by white space. The fields respectively are described as follows:
+ *
+ *
+ *
+ *
Field Name
+ *
+ *
Allowed Values
+ *
+ *
Allowed Special Characters
+ *
+ *
+ *
Seconds
+ *
+ *
0-59
+ *
+ *
, - * /
+ *
+ *
+ *
Minutes
+ *
+ *
0-59
+ *
+ *
, - * /
+ *
+ *
+ *
Hours
+ *
+ *
0-23
+ *
+ *
, - * /
+ *
+ *
+ *
Day-of-month
+ *
+ *
1-31
+ *
+ *
, - * ? / L W
+ *
+ *
+ *
Month
+ *
+ *
0-11 or JAN-DEC
+ *
+ *
, - * /
+ *
+ *
+ *
Day-of-Week
+ *
+ *
1-7 or SUN-SAT
+ *
+ *
, - * ? / L #
+ *
+ *
+ *
Year (Optional)
+ *
+ *
empty, 1970-2199
+ *
+ *
, - * /
+ *
+ *
+ *
+ * The '*' character is used to specify all values. For example, "*"
+ * in the minute field means "every minute".
+ *
+ * The '?' character is allowed for the day-of-month and day-of-week fields. It
+ * is used to specify 'no specific value'. This is useful when you need to
+ * specify something in one of the two fields, but not the other.
+ *
+ * The '-' character is used to specify ranges For example "10-12" in
+ * the hour field means "the hours 10, 11 and 12".
+ *
+ * The ',' character is used to specify additional values. For example
+ * "MON,WED,FRI" in the day-of-week field means "the days Monday,
+ * Wednesday, and Friday".
+ *
+ * The '/' character is used to specify increments. For example "0/15"
+ * in the seconds field means "the seconds 0, 15, 30, and 45". And
+ * "5/15" in the seconds field means "the seconds 5, 20, 35, and
+ * 50". Specifying '*' before the '/' is equivalent to specifying 0 is
+ * the value to start with. Essentially, for each field in the expression, there
+ * is a set of numbers that can be turned on or off. For seconds and minutes,
+ * the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to
+ * 31, and for months 0 to 11 (JAN to DEC). The "/" character simply helps you turn
+ * on every "nth" value in the given set. Thus "7/6" in the
+ * month field only turns on month "7", it does NOT mean every 6th
+ * month, please note that subtlety.
+ *
+ * The 'L' character is allowed for the day-of-month and day-of-week fields.
+ * This character is short-hand for "last", but it has different
+ * meaning in each of the two fields. For example, the value "L" in
+ * the day-of-month field means "the last day of the month" - day 31
+ * for January, day 28 for February on non-leap years. If used in the
+ * day-of-week field by itself, it simply means "7" or
+ * "SAT". But if used in the day-of-week field after another value, it
+ * means "the last xxx day of the month" - for example "6L"
+ * means "the last friday of the month". You can also specify an offset
+ * from the last day of the month, such as "L-3" which would mean the third-to-last
+ * day of the calendar month. When using the 'L' option, it is important not to
+ * specify lists, or ranges of values, as you'll get confusing/unexpected results.
+ *
+ * The 'W' character is allowed for the day-of-month field. This character
+ * is used to specify the weekday (Monday-Friday) nearest the given day. As an
+ * example, if you were to specify "15W" as the value for the
+ * day-of-month field, the meaning is: "the nearest weekday to the 15th of
+ * the month". So if the 15th is a Saturday, the trigger will fire on
+ * Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the
+ * 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th.
+ * However if you specify "1W" as the value for day-of-month, and the
+ * 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not
+ * 'jump' over the boundary of a month's days. The 'W' character can only be
+ * specified when the day-of-month is a single day, not a range or list of days.
+ *
+ * The 'L' and 'W' characters can also be combined for the day-of-month
+ * expression to yield 'LW', which translates to "last weekday of the
+ * month".
+ *
+ * The '#' character is allowed for the day-of-week field. This character is
+ * used to specify "the nth" XXX day of the month. For example, the
+ * value of "6#3" in the day-of-week field means the third Friday of
+ * the month (day 6 = Friday and "#3" = the 3rd one in the month).
+ * Other examples: "2#1" = the first Monday of the month and
+ * "4#5" = the fifth Wednesday of the month. Note that if you specify
+ * "#5" and there is not 5 of the given day-of-week in the month, then
+ * no firing will occur that month. If the '#' character is used, there can
+ * only be one expression in the day-of-week field ("3#1,6#3" is
+ * not valid, since there are two expressions).
+ *
+ *
+ *
+ * The legal characters and the names of months and days of the week are not
+ * case sensitive.
+ *
+ *
+ * NOTES:
+ *
+ *
Support for specifying both a day-of-week and a day-of-month value is
+ * not complete (you'll need to use the '?' character in one of these fields).
+ *
+ *
Overflowing ranges is supported - that is, having a larger number on
+ * the left hand side than the right. You might do 22-2 to catch 10 o'clock
+ * at night until 2 o'clock in the morning, or you might have NOV-FEB. It is
+ * very important to note that overuse of overflowing ranges creates ranges
+ * that don't make sense and no effort has been made to determine which
+ * interpretation CronExpression chooses. An example would be
+ * "0 0 14-6 ? * FRI-MON".
+ *
+ *
+ *
+ *
+ * @author Sharada Jambula, James House
+ * @author Contributions from Mads Henderson
+ * @author Refactoring from CronTrigger to CronExpression by Aaron Craven
+ */
+public final class CronExpression implements Serializable, Cloneable {
+
+ private static final long serialVersionUID = 12423409423L;
+
+ protected static final int SECOND = 0;
+ protected static final int MINUTE = 1;
+ protected static final int HOUR = 2;
+ protected static final int DAY_OF_MONTH = 3;
+ protected static final int MONTH = 4;
+ protected static final int DAY_OF_WEEK = 5;
+ protected static final int YEAR = 6;
+ protected static final int ALL_SPEC_INT = 99; // '*'
+ protected static final int NO_SPEC_INT = 98; // '?'
+ protected static final Integer ALL_SPEC = ALL_SPEC_INT;
+ protected static final Integer NO_SPEC = NO_SPEC_INT;
+
+ protected static final Map monthMap = new HashMap(20);
+ protected static final Map dayMap = new HashMap(60);
+ static {
+ monthMap.put("JAN", 0);
+ monthMap.put("FEB", 1);
+ monthMap.put("MAR", 2);
+ monthMap.put("APR", 3);
+ monthMap.put("MAY", 4);
+ monthMap.put("JUN", 5);
+ monthMap.put("JUL", 6);
+ monthMap.put("AUG", 7);
+ monthMap.put("SEP", 8);
+ monthMap.put("OCT", 9);
+ monthMap.put("NOV", 10);
+ monthMap.put("DEC", 11);
+
+ dayMap.put("SUN", 1);
+ dayMap.put("MON", 2);
+ dayMap.put("TUE", 3);
+ dayMap.put("WED", 4);
+ dayMap.put("THU", 5);
+ dayMap.put("FRI", 6);
+ dayMap.put("SAT", 7);
+ }
+
+ private final String cronExpression;
+ private TimeZone timeZone = null;
+ protected transient TreeSet seconds;
+ protected transient TreeSet minutes;
+ protected transient TreeSet hours;
+ protected transient TreeSet daysOfMonth;
+ protected transient TreeSet months;
+ protected transient TreeSet daysOfWeek;
+ protected transient TreeSet years;
+
+ protected transient boolean lastdayOfWeek = false;
+ protected transient int nthdayOfWeek = 0;
+ protected transient boolean lastdayOfMonth = false;
+ protected transient boolean nearestWeekday = false;
+ protected transient int lastdayOffset = 0;
+ protected transient boolean expressionParsed = false;
+
+ public static final int MAX_YEAR = Calendar.getInstance().get(Calendar.YEAR) + 100;
+
+ /**
+ * Constructs a new CronExpression based on the specified
+ * parameter.
+ *
+ * @param cronExpression String representation of the cron expression the
+ * new object should represent
+ * @throws java.text.ParseException
+ * if the string expression cannot be parsed into a valid
+ * CronExpression
+ */
+ public CronExpression(String cronExpression) throws ParseException {
+ if (cronExpression == null) {
+ throw new IllegalArgumentException("cronExpression cannot be null");
+ }
+
+ this.cronExpression = cronExpression.toUpperCase(Locale.US);
+
+ buildExpression(this.cronExpression);
+ }
+
+ /**
+ * Constructs a new {@code CronExpression} as a copy of an existing
+ * instance.
+ *
+ * @param expression
+ * The existing cron expression to be copied
+ */
+ public CronExpression(CronExpression expression) {
+ /*
+ * We don't call the other constructor here since we need to swallow the
+ * ParseException. We also elide some of the sanity checking as it is
+ * not logically trippable.
+ */
+ this.cronExpression = expression.getCronExpression();
+ try {
+ buildExpression(cronExpression);
+ } catch (ParseException ex) {
+ throw new AssertionError();
+ }
+ if (expression.getTimeZone() != null) {
+ setTimeZone((TimeZone) expression.getTimeZone().clone());
+ }
+ }
+
+ /**
+ * Indicates whether the given date satisfies the cron expression. Note that
+ * milliseconds are ignored, so two Dates falling on different milliseconds
+ * of the same second will always have the same result here.
+ *
+ * @param date the date to evaluate
+ * @return a boolean indicating whether the given date satisfies the cron
+ * expression
+ */
+ public boolean isSatisfiedBy(Date date) {
+ Calendar testDateCal = Calendar.getInstance(getTimeZone());
+ testDateCal.setTime(date);
+ testDateCal.set(Calendar.MILLISECOND, 0);
+ Date originalDate = testDateCal.getTime();
+
+ testDateCal.add(Calendar.SECOND, -1);
+
+ Date timeAfter = getTimeAfter(testDateCal.getTime());
+
+ return ((timeAfter != null) && (timeAfter.equals(originalDate)));
+ }
+
+ /**
+ * Returns the next date/time after the given date/time which
+ * satisfies the cron expression.
+ *
+ * @param date the date/time at which to begin the search for the next valid
+ * date/time
+ * @return the next valid date/time
+ */
+ public Date getNextValidTimeAfter(Date date) {
+ return getTimeAfter(date);
+ }
+
+ /**
+ * Returns the next date/time after the given date/time which does
+ * not satisfy the expression
+ *
+ * @param date the date/time at which to begin the search for the next
+ * invalid date/time
+ * @return the next valid date/time
+ */
+ public Date getNextInvalidTimeAfter(Date date) {
+ long difference = 1000;
+
+ //move back to the nearest second so differences will be accurate
+ Calendar adjustCal = Calendar.getInstance(getTimeZone());
+ adjustCal.setTime(date);
+ adjustCal.set(Calendar.MILLISECOND, 0);
+ Date lastDate = adjustCal.getTime();
+
+ Date newDate;
+
+ //FUTURE_TODO: (QUARTZ-481) IMPROVE THIS! The following is a BAD solution to this problem. Performance will be very bad here, depending on the cron expression. It is, however A solution.
+
+ //keep getting the next included time until it's farther than one second
+ // apart. At that point, lastDate is the last valid fire time. We return
+ // the second immediately following it.
+ while (difference == 1000) {
+ newDate = getTimeAfter(lastDate);
+ if(newDate == null)
+ break;
+
+ difference = newDate.getTime() - lastDate.getTime();
+
+ if (difference == 1000) {
+ lastDate = newDate;
+ }
+ }
+
+ return new Date(lastDate.getTime() + 1000);
+ }
+
+ /**
+ * Returns the time zone for which this CronExpression
+ * will be resolved.
+ */
+ public TimeZone getTimeZone() {
+ if (timeZone == null) {
+ timeZone = TimeZone.getDefault();
+ }
+
+ return timeZone;
+ }
+
+ /**
+ * Sets the time zone for which this CronExpression
+ * will be resolved.
+ */
+ public void setTimeZone(TimeZone timeZone) {
+ this.timeZone = timeZone;
+ }
+
+ /**
+ * Returns the string representation of the CronExpression
+ *
+ * @return a string representation of the CronExpression
+ */
+ @Override
+ public String toString() {
+ return cronExpression;
+ }
+
+ /**
+ * Indicates whether the specified cron expression can be parsed into a
+ * valid cron expression
+ *
+ * @param cronExpression the expression to evaluate
+ * @return a boolean indicating whether the given expression is a valid cron
+ * expression
+ */
+ public static boolean isValidExpression(String cronExpression) {
+
+ try {
+ new CronExpression(cronExpression);
+ } catch (ParseException pe) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public static void validateExpression(String cronExpression) throws ParseException {
+
+ new CronExpression(cronExpression);
+ }
+
+
+ ////////////////////////////////////////////////////////////////////////////
+ //
+ // Expression Parsing Functions
+ //
+ ////////////////////////////////////////////////////////////////////////////
+
+ protected void buildExpression(String expression) throws ParseException {
+ expressionParsed = true;
+
+ try {
+
+ if (seconds == null) {
+ seconds = new TreeSet();
+ }
+ if (minutes == null) {
+ minutes = new TreeSet();
+ }
+ if (hours == null) {
+ hours = new TreeSet();
+ }
+ if (daysOfMonth == null) {
+ daysOfMonth = new TreeSet();
+ }
+ if (months == null) {
+ months = new TreeSet();
+ }
+ if (daysOfWeek == null) {
+ daysOfWeek = new TreeSet();
+ }
+ if (years == null) {
+ years = new TreeSet();
+ }
+
+ int exprOn = SECOND;
+
+ StringTokenizer exprsTok = new StringTokenizer(expression, " \t",
+ false);
+
+ while (exprsTok.hasMoreTokens() && exprOn <= YEAR) {
+ String expr = exprsTok.nextToken().trim();
+
+ // throw an exception if L is used with other days of the month
+ if(exprOn == DAY_OF_MONTH && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) {
+ throw new ParseException("Support for specifying 'L' and 'LW' with other days of the month is not implemented", -1);
+ }
+ // throw an exception if L is used with other days of the week
+ if(exprOn == DAY_OF_WEEK && expr.indexOf('L') != -1 && expr.length() > 1 && expr.contains(",")) {
+ throw new ParseException("Support for specifying 'L' with other days of the week is not implemented", -1);
+ }
+ if(exprOn == DAY_OF_WEEK && expr.indexOf('#') != -1 && expr.indexOf('#', expr.indexOf('#') +1) != -1) {
+ throw new ParseException("Support for specifying multiple \"nth\" days is not implemented.", -1);
+ }
+
+ StringTokenizer vTok = new StringTokenizer(expr, ",");
+ while (vTok.hasMoreTokens()) {
+ String v = vTok.nextToken();
+ storeExpressionVals(0, v, exprOn);
+ }
+
+ exprOn++;
+ }
+
+ if (exprOn <= DAY_OF_WEEK) {
+ throw new ParseException("Unexpected end of expression.",
+ expression.length());
+ }
+
+ if (exprOn <= YEAR) {
+ storeExpressionVals(0, "*", YEAR);
+ }
+
+ TreeSet dow = getSet(DAY_OF_WEEK);
+ TreeSet dom = getSet(DAY_OF_MONTH);
+
+ // Copying the logic from the UnsupportedOperationException below
+ boolean dayOfMSpec = !dom.contains(NO_SPEC);
+ boolean dayOfWSpec = !dow.contains(NO_SPEC);
+
+ if (!dayOfMSpec || dayOfWSpec) {
+ if (!dayOfWSpec || dayOfMSpec) {
+ throw new ParseException(
+ "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.", 0);
+ }
+ }
+ } catch (ParseException pe) {
+ throw pe;
+ } catch (Exception e) {
+ throw new ParseException("Illegal cron expression format ("
+ + e.toString() + ")", 0);
+ }
+ }
+
+ protected int storeExpressionVals(int pos, String s, int type)
+ throws ParseException {
+
+ int incr = 0;
+ int i = skipWhiteSpace(pos, s);
+ if (i >= s.length()) {
+ return i;
+ }
+ char c = s.charAt(i);
+ if ((c >= 'A') && (c <= 'Z') && (!s.equals("L")) && (!s.equals("LW")) && (!s.matches("^L-[0-9]*[W]?"))) {
+ String sub = s.substring(i, i + 3);
+ int sval = -1;
+ int eval = -1;
+ if (type == MONTH) {
+ sval = getMonthNumber(sub) + 1;
+ if (sval <= 0) {
+ throw new ParseException("Invalid Month value: '" + sub + "'", i);
+ }
+ if (s.length() > i + 3) {
+ c = s.charAt(i + 3);
+ if (c == '-') {
+ i += 4;
+ sub = s.substring(i, i + 3);
+ eval = getMonthNumber(sub) + 1;
+ if (eval <= 0) {
+ throw new ParseException("Invalid Month value: '" + sub + "'", i);
+ }
+ }
+ }
+ } else if (type == DAY_OF_WEEK) {
+ sval = getDayOfWeekNumber(sub);
+ if (sval < 0) {
+ throw new ParseException("Invalid Day-of-Week value: '"
+ + sub + "'", i);
+ }
+ if (s.length() > i + 3) {
+ c = s.charAt(i + 3);
+ if (c == '-') {
+ i += 4;
+ sub = s.substring(i, i + 3);
+ eval = getDayOfWeekNumber(sub);
+ if (eval < 0) {
+ throw new ParseException(
+ "Invalid Day-of-Week value: '" + sub
+ + "'", i);
+ }
+ } else if (c == '#') {
+ try {
+ i += 4;
+ nthdayOfWeek = Integer.parseInt(s.substring(i));
+ if (nthdayOfWeek < 1 || nthdayOfWeek > 5) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw new ParseException(
+ "A numeric value between 1 and 5 must follow the '#' option",
+ i);
+ }
+ } else if (c == 'L') {
+ lastdayOfWeek = true;
+ i++;
+ }
+ }
+
+ } else {
+ throw new ParseException(
+ "Illegal characters for this position: '" + sub + "'",
+ i);
+ }
+ if (eval != -1) {
+ incr = 1;
+ }
+ addToSet(sval, eval, incr, type);
+ return (i + 3);
+ }
+
+ if (c == '?') {
+ i++;
+ if ((i + 1) < s.length()
+ && (s.charAt(i) != ' ' && s.charAt(i + 1) != '\t')) {
+ throw new ParseException("Illegal character after '?': "
+ + s.charAt(i), i);
+ }
+ if (type != DAY_OF_WEEK && type != DAY_OF_MONTH) {
+ throw new ParseException(
+ "'?' can only be specified for Day-of-Month or Day-of-Week.",
+ i);
+ }
+ if (type == DAY_OF_WEEK && !lastdayOfMonth) {
+ int val = daysOfMonth.last();
+ if (val == NO_SPEC_INT) {
+ throw new ParseException(
+ "'?' can only be specified for Day-of-Month -OR- Day-of-Week.",
+ i);
+ }
+ }
+
+ addToSet(NO_SPEC_INT, -1, 0, type);
+ return i;
+ }
+
+ if (c == '*' || c == '/') {
+ if (c == '*' && (i + 1) >= s.length()) {
+ addToSet(ALL_SPEC_INT, -1, incr, type);
+ return i + 1;
+ } else if (c == '/'
+ && ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s
+ .charAt(i + 1) == '\t')) {
+ throw new ParseException("'/' must be followed by an integer.", i);
+ } else if (c == '*') {
+ i++;
+ }
+ c = s.charAt(i);
+ if (c == '/') { // is an increment specified?
+ i++;
+ if (i >= s.length()) {
+ throw new ParseException("Unexpected end of string.", i);
+ }
+
+ incr = getNumericValue(s, i);
+
+ i++;
+ if (incr > 10) {
+ i++;
+ }
+ checkIncrementRange(incr, type, i);
+ } else {
+ incr = 1;
+ }
+
+ addToSet(ALL_SPEC_INT, -1, incr, type);
+ return i;
+ } else if (c == 'L') {
+ i++;
+ if (type == DAY_OF_MONTH) {
+ lastdayOfMonth = true;
+ }
+ if (type == DAY_OF_WEEK) {
+ addToSet(7, 7, 0, type);
+ }
+ if(type == DAY_OF_MONTH && s.length() > i) {
+ c = s.charAt(i);
+ if(c == '-') {
+ ValueSet vs = getValue(0, s, i+1);
+ lastdayOffset = vs.value;
+ if(lastdayOffset > 30)
+ throw new ParseException("Offset from last day must be <= 30", i+1);
+ i = vs.pos;
+ }
+ if(s.length() > i) {
+ c = s.charAt(i);
+ if(c == 'W') {
+ nearestWeekday = true;
+ i++;
+ }
+ }
+ }
+ return i;
+ } else if (c >= '0' && c <= '9') {
+ int val = Integer.parseInt(String.valueOf(c));
+ i++;
+ if (i >= s.length()) {
+ addToSet(val, -1, -1, type);
+ } else {
+ c = s.charAt(i);
+ if (c >= '0' && c <= '9') {
+ ValueSet vs = getValue(val, s, i);
+ val = vs.value;
+ i = vs.pos;
+ }
+ i = checkNext(i, s, val, type);
+ return i;
+ }
+ } else {
+ throw new ParseException("Unexpected character: " + c, i);
+ }
+
+ return i;
+ }
+
+ private void checkIncrementRange(int incr, int type, int idxPos) throws ParseException {
+ if (incr > 59 && (type == SECOND || type == MINUTE)) {
+ throw new ParseException("Increment > 60 : " + incr, idxPos);
+ } else if (incr > 23 && (type == HOUR)) {
+ throw new ParseException("Increment > 24 : " + incr, idxPos);
+ } else if (incr > 31 && (type == DAY_OF_MONTH)) {
+ throw new ParseException("Increment > 31 : " + incr, idxPos);
+ } else if (incr > 7 && (type == DAY_OF_WEEK)) {
+ throw new ParseException("Increment > 7 : " + incr, idxPos);
+ } else if (incr > 12 && (type == MONTH)) {
+ throw new ParseException("Increment > 12 : " + incr, idxPos);
+ }
+ }
+
+ protected int checkNext(int pos, String s, int val, int type)
+ throws ParseException {
+
+ int end = -1;
+ int i = pos;
+
+ if (i >= s.length()) {
+ addToSet(val, end, -1, type);
+ return i;
+ }
+
+ char c = s.charAt(pos);
+
+ if (c == 'L') {
+ if (type == DAY_OF_WEEK) {
+ if(val < 1 || val > 7)
+ throw new ParseException("Day-of-Week values must be between 1 and 7", -1);
+ lastdayOfWeek = true;
+ } else {
+ throw new ParseException("'L' option is not valid here. (pos=" + i + ")", i);
+ }
+ TreeSet set = getSet(type);
+ set.add(val);
+ i++;
+ return i;
+ }
+
+ if (c == 'W') {
+ if (type == DAY_OF_MONTH) {
+ nearestWeekday = true;
+ } else {
+ throw new ParseException("'W' option is not valid here. (pos=" + i + ")", i);
+ }
+ if(val > 31)
+ throw new ParseException("The 'W' option does not make sense with values larger than 31 (max number of days in a month)", i);
+ TreeSet set = getSet(type);
+ set.add(val);
+ i++;
+ return i;
+ }
+
+ if (c == '#') {
+ if (type != DAY_OF_WEEK) {
+ throw new ParseException("'#' option is not valid here. (pos=" + i + ")", i);
+ }
+ i++;
+ try {
+ nthdayOfWeek = Integer.parseInt(s.substring(i));
+ if (nthdayOfWeek < 1 || nthdayOfWeek > 5) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw new ParseException(
+ "A numeric value between 1 and 5 must follow the '#' option",
+ i);
+ }
+
+ TreeSet set = getSet(type);
+ set.add(val);
+ i++;
+ return i;
+ }
+
+ if (c == '-') {
+ i++;
+ c = s.charAt(i);
+ int v = Integer.parseInt(String.valueOf(c));
+ end = v;
+ i++;
+ if (i >= s.length()) {
+ addToSet(val, end, 1, type);
+ return i;
+ }
+ c = s.charAt(i);
+ if (c >= '0' && c <= '9') {
+ ValueSet vs = getValue(v, s, i);
+ end = vs.value;
+ i = vs.pos;
+ }
+ if (i < s.length() && ((c = s.charAt(i)) == '/')) {
+ i++;
+ c = s.charAt(i);
+ int v2 = Integer.parseInt(String.valueOf(c));
+ i++;
+ if (i >= s.length()) {
+ addToSet(val, end, v2, type);
+ return i;
+ }
+ c = s.charAt(i);
+ if (c >= '0' && c <= '9') {
+ ValueSet vs = getValue(v2, s, i);
+ int v3 = vs.value;
+ addToSet(val, end, v3, type);
+ i = vs.pos;
+ return i;
+ } else {
+ addToSet(val, end, v2, type);
+ return i;
+ }
+ } else {
+ addToSet(val, end, 1, type);
+ return i;
+ }
+ }
+
+ if (c == '/') {
+ if ((i + 1) >= s.length() || s.charAt(i + 1) == ' ' || s.charAt(i + 1) == '\t') {
+ throw new ParseException("'/' must be followed by an integer.", i);
+ }
+
+ i++;
+ c = s.charAt(i);
+ int v2 = Integer.parseInt(String.valueOf(c));
+ i++;
+ if (i >= s.length()) {
+ checkIncrementRange(v2, type, i);
+ addToSet(val, end, v2, type);
+ return i;
+ }
+ c = s.charAt(i);
+ if (c >= '0' && c <= '9') {
+ ValueSet vs = getValue(v2, s, i);
+ int v3 = vs.value;
+ checkIncrementRange(v3, type, i);
+ addToSet(val, end, v3, type);
+ i = vs.pos;
+ return i;
+ } else {
+ throw new ParseException("Unexpected character '" + c + "' after '/'", i);
+ }
+ }
+
+ addToSet(val, end, 0, type);
+ i++;
+ return i;
+ }
+
+ public String getCronExpression() {
+ return cronExpression;
+ }
+
+ public String getExpressionSummary() {
+ StringBuilder buf = new StringBuilder();
+
+ buf.append("seconds: ");
+ buf.append(getExpressionSetSummary(seconds));
+ buf.append("\n");
+ buf.append("minutes: ");
+ buf.append(getExpressionSetSummary(minutes));
+ buf.append("\n");
+ buf.append("hours: ");
+ buf.append(getExpressionSetSummary(hours));
+ buf.append("\n");
+ buf.append("daysOfMonth: ");
+ buf.append(getExpressionSetSummary(daysOfMonth));
+ buf.append("\n");
+ buf.append("months: ");
+ buf.append(getExpressionSetSummary(months));
+ buf.append("\n");
+ buf.append("daysOfWeek: ");
+ buf.append(getExpressionSetSummary(daysOfWeek));
+ buf.append("\n");
+ buf.append("lastdayOfWeek: ");
+ buf.append(lastdayOfWeek);
+ buf.append("\n");
+ buf.append("nearestWeekday: ");
+ buf.append(nearestWeekday);
+ buf.append("\n");
+ buf.append("NthDayOfWeek: ");
+ buf.append(nthdayOfWeek);
+ buf.append("\n");
+ buf.append("lastdayOfMonth: ");
+ buf.append(lastdayOfMonth);
+ buf.append("\n");
+ buf.append("years: ");
+ buf.append(getExpressionSetSummary(years));
+ buf.append("\n");
+
+ return buf.toString();
+ }
+
+ protected String getExpressionSetSummary(java.util.Set set) {
+
+ if (set.contains(NO_SPEC)) {
+ return "?";
+ }
+ if (set.contains(ALL_SPEC)) {
+ return "*";
+ }
+
+ StringBuilder buf = new StringBuilder();
+
+ Iterator itr = set.iterator();
+ boolean first = true;
+ while (itr.hasNext()) {
+ Integer iVal = itr.next();
+ String val = iVal.toString();
+ if (!first) {
+ buf.append(",");
+ }
+ buf.append(val);
+ first = false;
+ }
+
+ return buf.toString();
+ }
+
+ protected String getExpressionSetSummary(java.util.ArrayList list) {
+
+ if (list.contains(NO_SPEC)) {
+ return "?";
+ }
+ if (list.contains(ALL_SPEC)) {
+ return "*";
+ }
+
+ StringBuilder buf = new StringBuilder();
+
+ Iterator itr = list.iterator();
+ boolean first = true;
+ while (itr.hasNext()) {
+ Integer iVal = itr.next();
+ String val = iVal.toString();
+ if (!first) {
+ buf.append(",");
+ }
+ buf.append(val);
+ first = false;
+ }
+
+ return buf.toString();
+ }
+
+ protected int skipWhiteSpace(int i, String s) {
+ for (; i < s.length() && (s.charAt(i) == ' ' || s.charAt(i) == '\t'); i++) {
+ ;
+ }
+
+ return i;
+ }
+
+ protected int findNextWhiteSpace(int i, String s) {
+ for (; i < s.length() && (s.charAt(i) != ' ' || s.charAt(i) != '\t'); i++) {
+ ;
+ }
+
+ return i;
+ }
+
+ protected void addToSet(int val, int end, int incr, int type)
+ throws ParseException {
+
+ TreeSet set = getSet(type);
+
+ if (type == SECOND || type == MINUTE) {
+ if ((val < 0 || val > 59 || end > 59) && (val != ALL_SPEC_INT)) {
+ throw new ParseException(
+ "Minute and Second values must be between 0 and 59",
+ -1);
+ }
+ } else if (type == HOUR) {
+ if ((val < 0 || val > 23 || end > 23) && (val != ALL_SPEC_INT)) {
+ throw new ParseException(
+ "Hour values must be between 0 and 23", -1);
+ }
+ } else if (type == DAY_OF_MONTH) {
+ if ((val < 1 || val > 31 || end > 31) && (val != ALL_SPEC_INT)
+ && (val != NO_SPEC_INT)) {
+ throw new ParseException(
+ "Day of month values must be between 1 and 31", -1);
+ }
+ } else if (type == MONTH) {
+ if ((val < 1 || val > 12 || end > 12) && (val != ALL_SPEC_INT)) {
+ throw new ParseException(
+ "Month values must be between 1 and 12", -1);
+ }
+ } else if (type == DAY_OF_WEEK) {
+ if ((val == 0 || val > 7 || end > 7) && (val != ALL_SPEC_INT)
+ && (val != NO_SPEC_INT)) {
+ throw new ParseException(
+ "Day-of-Week values must be between 1 and 7", -1);
+ }
+ }
+
+ if ((incr == 0 || incr == -1) && val != ALL_SPEC_INT) {
+ if (val != -1) {
+ set.add(val);
+ } else {
+ set.add(NO_SPEC);
+ }
+
+ return;
+ }
+
+ int startAt = val;
+ int stopAt = end;
+
+ if (val == ALL_SPEC_INT && incr <= 0) {
+ incr = 1;
+ set.add(ALL_SPEC); // put in a marker, but also fill values
+ }
+
+ if (type == SECOND || type == MINUTE) {
+ if (stopAt == -1) {
+ stopAt = 59;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 0;
+ }
+ } else if (type == HOUR) {
+ if (stopAt == -1) {
+ stopAt = 23;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 0;
+ }
+ } else if (type == DAY_OF_MONTH) {
+ if (stopAt == -1) {
+ stopAt = 31;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 1;
+ }
+ } else if (type == MONTH) {
+ if (stopAt == -1) {
+ stopAt = 12;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 1;
+ }
+ } else if (type == DAY_OF_WEEK) {
+ if (stopAt == -1) {
+ stopAt = 7;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 1;
+ }
+ } else if (type == YEAR) {
+ if (stopAt == -1) {
+ stopAt = MAX_YEAR;
+ }
+ if (startAt == -1 || startAt == ALL_SPEC_INT) {
+ startAt = 1970;
+ }
+ }
+
+ // if the end of the range is before the start, then we need to overflow into
+ // the next day, month etc. This is done by adding the maximum amount for that
+ // type, and using modulus max to determine the value being added.
+ int max = -1;
+ if (stopAt < startAt) {
+ switch (type) {
+ case SECOND : max = 60; break;
+ case MINUTE : max = 60; break;
+ case HOUR : max = 24; break;
+ case MONTH : max = 12; break;
+ case DAY_OF_WEEK : max = 7; break;
+ case DAY_OF_MONTH : max = 31; break;
+ case YEAR : throw new IllegalArgumentException("Start year must be less than stop year");
+ default : throw new IllegalArgumentException("Unexpected type encountered");
+ }
+ stopAt += max;
+ }
+
+ for (int i = startAt; i <= stopAt; i += incr) {
+ if (max == -1) {
+ // ie: there's no max to overflow over
+ set.add(i);
+ } else {
+ // take the modulus to get the real value
+ int i2 = i % max;
+
+ // 1-indexed ranges should not include 0, and should include their max
+ if (i2 == 0 && (type == MONTH || type == DAY_OF_WEEK || type == DAY_OF_MONTH) ) {
+ i2 = max;
+ }
+
+ set.add(i2);
+ }
+ }
+ }
+
+ TreeSet getSet(int type) {
+ switch (type) {
+ case SECOND:
+ return seconds;
+ case MINUTE:
+ return minutes;
+ case HOUR:
+ return hours;
+ case DAY_OF_MONTH:
+ return daysOfMonth;
+ case MONTH:
+ return months;
+ case DAY_OF_WEEK:
+ return daysOfWeek;
+ case YEAR:
+ return years;
+ default:
+ return null;
+ }
+ }
+
+ protected ValueSet getValue(int v, String s, int i) {
+ char c = s.charAt(i);
+ StringBuilder s1 = new StringBuilder(String.valueOf(v));
+ while (c >= '0' && c <= '9') {
+ s1.append(c);
+ i++;
+ if (i >= s.length()) {
+ break;
+ }
+ c = s.charAt(i);
+ }
+ ValueSet val = new ValueSet();
+
+ val.pos = (i < s.length()) ? i : i + 1;
+ val.value = Integer.parseInt(s1.toString());
+ return val;
+ }
+
+ protected int getNumericValue(String s, int i) {
+ int endOfVal = findNextWhiteSpace(i, s);
+ String val = s.substring(i, endOfVal);
+ return Integer.parseInt(val);
+ }
+
+ protected int getMonthNumber(String s) {
+ Integer integer = monthMap.get(s);
+
+ if (integer == null) {
+ return -1;
+ }
+
+ return integer;
+ }
+
+ protected int getDayOfWeekNumber(String s) {
+ Integer integer = dayMap.get(s);
+
+ if (integer == null) {
+ return -1;
+ }
+
+ return integer;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ //
+ // Computation Functions
+ //
+ ////////////////////////////////////////////////////////////////////////////
+
+ public Date getTimeAfter(Date afterTime) {
+
+ // Computation is based on Gregorian year only.
+ Calendar cl = new java.util.GregorianCalendar(getTimeZone());
+
+ // move ahead one second, since we're computing the time *after* the
+ // given time
+ afterTime = new Date(afterTime.getTime() + 1000);
+ // CronTrigger does not deal with milliseconds
+ cl.setTime(afterTime);
+ cl.set(Calendar.MILLISECOND, 0);
+
+ boolean gotOne = false;
+ // loop until we've computed the next time, or we've past the endTime
+ while (!gotOne) {
+
+ //if (endTime != null && cl.getTime().after(endTime)) return null;
+ if(cl.get(Calendar.YEAR) > 2999) { // prevent endless loop...
+ return null;
+ }
+
+ SortedSet st = null;
+ int t = 0;
+
+ int sec = cl.get(Calendar.SECOND);
+ int min = cl.get(Calendar.MINUTE);
+
+ // get second.................................................
+ st = seconds.tailSet(sec);
+ if (st != null && st.size() != 0) {
+ sec = st.first();
+ } else {
+ sec = seconds.first();
+ min++;
+ cl.set(Calendar.MINUTE, min);
+ }
+ cl.set(Calendar.SECOND, sec);
+
+ min = cl.get(Calendar.MINUTE);
+ int hr = cl.get(Calendar.HOUR_OF_DAY);
+ t = -1;
+
+ // get minute.................................................
+ st = minutes.tailSet(min);
+ if (st != null && st.size() != 0) {
+ t = min;
+ min = st.first();
+ } else {
+ min = minutes.first();
+ hr++;
+ }
+ if (min != t) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, min);
+ setCalendarHour(cl, hr);
+ continue;
+ }
+ cl.set(Calendar.MINUTE, min);
+
+ hr = cl.get(Calendar.HOUR_OF_DAY);
+ int day = cl.get(Calendar.DAY_OF_MONTH);
+ t = -1;
+
+ // get hour...................................................
+ st = hours.tailSet(hr);
+ if (st != null && st.size() != 0) {
+ t = hr;
+ hr = st.first();
+ } else {
+ hr = hours.first();
+ day++;
+ }
+ if (hr != t) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.DAY_OF_MONTH, day);
+ setCalendarHour(cl, hr);
+ continue;
+ }
+ cl.set(Calendar.HOUR_OF_DAY, hr);
+
+ day = cl.get(Calendar.DAY_OF_MONTH);
+ int mon = cl.get(Calendar.MONTH) + 1;
+ // '+ 1' because calendar is 0-based for this field, and we are
+ // 1-based
+ t = -1;
+ int tmon = mon;
+
+ // get day...................................................
+ boolean dayOfMSpec = !daysOfMonth.contains(NO_SPEC);
+ boolean dayOfWSpec = !daysOfWeek.contains(NO_SPEC);
+ if (dayOfMSpec && !dayOfWSpec) { // get day by day of month rule
+ st = daysOfMonth.tailSet(day);
+ if (lastdayOfMonth) {
+ if(!nearestWeekday) {
+ t = day;
+ day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+ day -= lastdayOffset;
+ if(t > day) {
+ mon++;
+ if(mon > 12) {
+ mon = 1;
+ tmon = 3333; // ensure test of mon != tmon further below fails
+ cl.add(Calendar.YEAR, 1);
+ }
+ day = 1;
+ }
+ } else {
+ t = day;
+ day = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+ day -= lastdayOffset;
+
+ java.util.Calendar tcal = java.util.Calendar.getInstance(getTimeZone());
+ tcal.set(Calendar.SECOND, 0);
+ tcal.set(Calendar.MINUTE, 0);
+ tcal.set(Calendar.HOUR_OF_DAY, 0);
+ tcal.set(Calendar.DAY_OF_MONTH, day);
+ tcal.set(Calendar.MONTH, mon - 1);
+ tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR));
+
+ int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+ int dow = tcal.get(Calendar.DAY_OF_WEEK);
+
+ if(dow == Calendar.SATURDAY && day == 1) {
+ day += 2;
+ } else if(dow == Calendar.SATURDAY) {
+ day -= 1;
+ } else if(dow == Calendar.SUNDAY && day == ldom) {
+ day -= 2;
+ } else if(dow == Calendar.SUNDAY) {
+ day += 1;
+ }
+
+ tcal.set(Calendar.SECOND, sec);
+ tcal.set(Calendar.MINUTE, min);
+ tcal.set(Calendar.HOUR_OF_DAY, hr);
+ tcal.set(Calendar.DAY_OF_MONTH, day);
+ tcal.set(Calendar.MONTH, mon - 1);
+ Date nTime = tcal.getTime();
+ if(nTime.before(afterTime)) {
+ day = 1;
+ mon++;
+ }
+ }
+ } else if(nearestWeekday) {
+ t = day;
+ day = daysOfMonth.first();
+
+ java.util.Calendar tcal = java.util.Calendar.getInstance(getTimeZone());
+ tcal.set(Calendar.SECOND, 0);
+ tcal.set(Calendar.MINUTE, 0);
+ tcal.set(Calendar.HOUR_OF_DAY, 0);
+ tcal.set(Calendar.DAY_OF_MONTH, day);
+ tcal.set(Calendar.MONTH, mon - 1);
+ tcal.set(Calendar.YEAR, cl.get(Calendar.YEAR));
+
+ int ldom = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+ int dow = tcal.get(Calendar.DAY_OF_WEEK);
+
+ if(dow == Calendar.SATURDAY && day == 1) {
+ day += 2;
+ } else if(dow == Calendar.SATURDAY) {
+ day -= 1;
+ } else if(dow == Calendar.SUNDAY && day == ldom) {
+ day -= 2;
+ } else if(dow == Calendar.SUNDAY) {
+ day += 1;
+ }
+
+
+ tcal.set(Calendar.SECOND, sec);
+ tcal.set(Calendar.MINUTE, min);
+ tcal.set(Calendar.HOUR_OF_DAY, hr);
+ tcal.set(Calendar.DAY_OF_MONTH, day);
+ tcal.set(Calendar.MONTH, mon - 1);
+ Date nTime = tcal.getTime();
+ if(nTime.before(afterTime)) {
+ day = daysOfMonth.first();
+ mon++;
+ }
+ } else if (st != null && st.size() != 0) {
+ t = day;
+ day = st.first();
+ // make sure we don't over-run a short month, such as february
+ int lastDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+ if (day > lastDay) {
+ day = daysOfMonth.first();
+ mon++;
+ }
+ } else {
+ day = daysOfMonth.first();
+ mon++;
+ }
+
+ if (day != t || mon != tmon) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, day);
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' because calendar is 0-based for this field, and we
+ // are 1-based
+ continue;
+ }
+ } else if (dayOfWSpec && !dayOfMSpec) { // get day by day of week rule
+ if (lastdayOfWeek) { // are we looking for the last XXX day of
+ // the month?
+ int dow = daysOfWeek.first(); // desired
+ // d-o-w
+ int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w
+ int daysToAdd = 0;
+ if (cDow < dow) {
+ daysToAdd = dow - cDow;
+ }
+ if (cDow > dow) {
+ daysToAdd = dow + (7 - cDow);
+ }
+
+ int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+
+ if (day + daysToAdd > lDay) { // did we already miss the
+ // last one?
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, 1);
+ cl.set(Calendar.MONTH, mon);
+ // no '- 1' here because we are promoting the month
+ continue;
+ }
+
+ // find date of last occurrence of this day in this month...
+ while ((day + daysToAdd + 7) <= lDay) {
+ daysToAdd += 7;
+ }
+
+ day += daysToAdd;
+
+ if (daysToAdd > 0) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, day);
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' here because we are not promoting the month
+ continue;
+ }
+
+ } else if (nthdayOfWeek != 0) {
+ // are we looking for the Nth XXX day in the month?
+ int dow = daysOfWeek.first(); // desired
+ // d-o-w
+ int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w
+ int daysToAdd = 0;
+ if (cDow < dow) {
+ daysToAdd = dow - cDow;
+ } else if (cDow > dow) {
+ daysToAdd = dow + (7 - cDow);
+ }
+
+ boolean dayShifted = false;
+ if (daysToAdd > 0) {
+ dayShifted = true;
+ }
+
+ day += daysToAdd;
+ int weekOfMonth = day / 7;
+ if (day % 7 > 0) {
+ weekOfMonth++;
+ }
+
+ daysToAdd = (nthdayOfWeek - weekOfMonth) * 7;
+ day += daysToAdd;
+ if (daysToAdd < 0
+ || day > getLastDayOfMonth(mon, cl
+ .get(Calendar.YEAR))) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, 1);
+ cl.set(Calendar.MONTH, mon);
+ // no '- 1' here because we are promoting the month
+ continue;
+ } else if (daysToAdd > 0 || dayShifted) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, day);
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' here because we are NOT promoting the month
+ continue;
+ }
+ } else {
+ int cDow = cl.get(Calendar.DAY_OF_WEEK); // current d-o-w
+ int dow = daysOfWeek.first(); // desired
+ // d-o-w
+ st = daysOfWeek.tailSet(cDow);
+ if (st != null && st.size() > 0) {
+ dow = st.first();
+ }
+
+ int daysToAdd = 0;
+ if (cDow < dow) {
+ daysToAdd = dow - cDow;
+ }
+ if (cDow > dow) {
+ daysToAdd = dow + (7 - cDow);
+ }
+
+ int lDay = getLastDayOfMonth(mon, cl.get(Calendar.YEAR));
+
+ if (day + daysToAdd > lDay) { // will we pass the end of
+ // the month?
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, 1);
+ cl.set(Calendar.MONTH, mon);
+ // no '- 1' here because we are promoting the month
+ continue;
+ } else if (daysToAdd > 0) { // are we swithing days?
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, day + daysToAdd);
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' because calendar is 0-based for this field,
+ // and we are 1-based
+ continue;
+ }
+ }
+ } else { // dayOfWSpec && !dayOfMSpec
+ throw new UnsupportedOperationException(
+ "Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.");
+ }
+ cl.set(Calendar.DAY_OF_MONTH, day);
+
+ mon = cl.get(Calendar.MONTH) + 1;
+ // '+ 1' because calendar is 0-based for this field, and we are
+ // 1-based
+ int year = cl.get(Calendar.YEAR);
+ t = -1;
+
+ // test for expressions that never generate a valid fire date,
+ // but keep looping...
+ if (year > MAX_YEAR) {
+ return null;
+ }
+
+ // get month...................................................
+ st = months.tailSet(mon);
+ if (st != null && st.size() != 0) {
+ t = mon;
+ mon = st.first();
+ } else {
+ mon = months.first();
+ year++;
+ }
+ if (mon != t) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, 1);
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' because calendar is 0-based for this field, and we are
+ // 1-based
+ cl.set(Calendar.YEAR, year);
+ continue;
+ }
+ cl.set(Calendar.MONTH, mon - 1);
+ // '- 1' because calendar is 0-based for this field, and we are
+ // 1-based
+
+ year = cl.get(Calendar.YEAR);
+ t = -1;
+
+ // get year...................................................
+ st = years.tailSet(year);
+ if (st != null && st.size() != 0) {
+ t = year;
+ year = st.first();
+ } else {
+ return null; // ran out of years...
+ }
+
+ if (year != t) {
+ cl.set(Calendar.SECOND, 0);
+ cl.set(Calendar.MINUTE, 0);
+ cl.set(Calendar.HOUR_OF_DAY, 0);
+ cl.set(Calendar.DAY_OF_MONTH, 1);
+ cl.set(Calendar.MONTH, 0);
+ // '- 1' because calendar is 0-based for this field, and we are
+ // 1-based
+ cl.set(Calendar.YEAR, year);
+ continue;
+ }
+ cl.set(Calendar.YEAR, year);
+
+ gotOne = true;
+ } // while( !done )
+
+ return cl.getTime();
+ }
+
+ /**
+ * Advance the calendar to the particular hour paying particular attention
+ * to daylight saving problems.
+ *
+ * @param cal the calendar to operate on
+ * @param hour the hour to set
+ */
+ protected void setCalendarHour(Calendar cal, int hour) {
+ cal.set(java.util.Calendar.HOUR_OF_DAY, hour);
+ if (cal.get(java.util.Calendar.HOUR_OF_DAY) != hour && hour != 24) {
+ cal.set(java.util.Calendar.HOUR_OF_DAY, hour + 1);
+ }
+ }
+
+ /**
+ * NOT YET IMPLEMENTED: Returns the time before the given time
+ * that the CronExpression matches.
+ */
+ public Date getTimeBefore(Date endTime) {
+ // FUTURE_TODO: implement QUARTZ-423
+ return null;
+ }
+
+ /**
+ * NOT YET IMPLEMENTED: Returns the final time that the
+ * CronExpression will match.
+ */
+ public Date getFinalFireTime() {
+ // FUTURE_TODO: implement QUARTZ-423
+ return null;
+ }
+
+ protected boolean isLeapYear(int year) {
+ return ((year % 4 == 0 && year % 100 != 0) || (year % 400 == 0));
+ }
+
+ protected int getLastDayOfMonth(int monthNum, int year) {
+
+ switch (monthNum) {
+ case 1:
+ return 31;
+ case 2:
+ return (isLeapYear(year)) ? 29 : 28;
+ case 3:
+ return 31;
+ case 4:
+ return 30;
+ case 5:
+ return 31;
+ case 6:
+ return 30;
+ case 7:
+ return 31;
+ case 8:
+ return 31;
+ case 9:
+ return 30;
+ case 10:
+ return 31;
+ case 11:
+ return 30;
+ case 12:
+ return 31;
+ default:
+ throw new IllegalArgumentException("Illegal month number: "
+ + monthNum);
+ }
+ }
+
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws java.io.IOException, ClassNotFoundException {
+
+ stream.defaultReadObject();
+ try {
+ buildExpression(cronExpression);
+ } catch (Exception ignore) {
+ } // never happens
+ }
+
+ @Override
+ @Deprecated
+ public Object clone() {
+ return new CronExpression(this);
+ }
+}
+
+class ValueSet {
+ public int value;
+ public int pos;
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java
new file mode 100644
index 00000000..cf3c3648
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/snowflake/SnowFlakeIdGenerator.java
@@ -0,0 +1,93 @@
+package com.github.kfcfans.oms.server.common.utils.snowflake;
+
+/**
+ * Twitter SnowFlake(Scala -> Java)
+ *
+ * @author tjq
+ * @since 2020/4/6
+ */
+public class SnowFlakeIdGenerator {
+
+ /**
+ * 起始的时间戳
+ */
+ private final static long START_STAMP = 1480166465631L;
+
+ /**
+ * 每一部分占用的位数
+ */
+ private final static long SEQUENCE_BIT = 12; //序列号占用的位数
+ private final static long MACHINE_BIT = 5; //机器标识占用的位数
+ private final static long DATA_CENTER_BIT = 5;//数据中心占用的位数
+
+ /**
+ * 每一部分的最大值
+ */
+ private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
+ private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
+ private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
+
+ /**
+ * 每一部分向左的位移
+ */
+ private final static long MACHINE_LEFT = SEQUENCE_BIT;
+ private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
+ private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
+
+ private long dataCenterId; //数据中心
+ private long machineId; //机器标识
+ private long sequence = 0L; //序列号
+ private long lastTimestamp = -1L;//上一次时间戳
+
+ public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
+ if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
+ throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");
+ }
+ if (machineId > MAX_MACHINE_NUM || machineId < 0) {
+ throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
+ }
+ this.dataCenterId = dataCenterId;
+ this.machineId = machineId;
+ }
+
+ /**
+ * 产生下一个ID
+ */
+ public synchronized long nextId() {
+ long currStamp = getNewStamp();
+ if (currStamp < lastTimestamp) {
+ throw new RuntimeException("Clock moved backwards. Refusing to generate id");
+ }
+
+ if (currStamp == lastTimestamp) {
+ //相同毫秒内,序列号自增
+ sequence = (sequence + 1) & MAX_SEQUENCE;
+ //同一毫秒的序列数已经达到最大
+ if (sequence == 0L) {
+ currStamp = getNextMill();
+ }
+ } else {
+ //不同毫秒内,序列号置为0
+ sequence = 0L;
+ }
+
+ lastTimestamp = currStamp;
+
+ return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
+ | dataCenterId << DATA_CENTER_LEFT //数据中心部分
+ | machineId << MACHINE_LEFT //机器标识部分
+ | sequence; //序列号部分
+ }
+
+ private long getNextMill() {
+ long mill = getNewStamp();
+ while (mill <= lastTimestamp) {
+ mill = getNewStamp();
+ }
+ return mill;
+ }
+
+ private long getNewStamp() {
+ return System.currentTimeMillis();
+ }
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
index 8eac5e96..97fd7120 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
@@ -26,6 +26,8 @@ public class OhMyServer {
@Getter
private static String actorSystemAddress;
+ private static final String AKKA_PATH = "akka://%s@%s/user/%s";
+
public static void init() {
// 1. 启动 ActorSystem
@@ -51,7 +53,12 @@ public class OhMyServer {
* @return ActorSelection
*/
public static ActorSelection getServerActor(String address) {
- String path = String.format("akka://%s@%s/user/%s", RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
+ String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
+ return actorSystem.actorSelection(path);
+ }
+
+ public static ActorSelection getTaskTrackerActor(String address) {
+ String path = String.format(AKKA_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
index 46c7f3a0..629c2037 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
@@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.core.akka;
import akka.actor.AbstractActor;
+import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
@@ -35,7 +36,19 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf());
}
+ /**
+ * 处理 Worker 的心跳请求
+ * @param heartbeat 心跳包
+ */
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
WorkerManagerService.updateStatus(heartbeat);
}
+
+ /**
+ * 处理 instance 状态
+ * @param req 任务实例的状态上报请求
+ */
+ private void onReceive(TaskTrackerReportInstanceStatusReq req) {
+
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
index 80c71b16..98c5849f 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
@@ -23,6 +23,7 @@ public class AppInfoDO {
private String appName;
private String description;
+ // 当前负责该 appName 旗下任务调度的server地址,IP:Port
private String currentServer;
private Date gmtCreate;
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java
similarity index 58%
rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java
rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java
index e06cc63a..d8359052 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobLogDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java
@@ -1,5 +1,7 @@
package com.github.kfcfans.oms.server.persistence.model;
+import lombok.Data;
+
import javax.persistence.*;
import java.util.Date;
@@ -9,9 +11,10 @@ import java.util.Date;
* @author tjq
* @since 2020/3/30
*/
+@Data
@Entity
-@Table(name = "job_log")
-public class JobLogDO {
+@Table(name = "execute_log", indexes = {@Index(columnList = "jobId")})
+public class ExecuteLogDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@@ -20,13 +23,19 @@ public class JobLogDO {
// 任务ID
private Long jobId;
// 任务实例ID
- private String instanceId;
- // 任务状态 运行中/成功/失败...
+ private Long instanceId;
+ /**
+ * 任务状态 {@link com.github.kfcfans.common.InstanceStatus}
+ */
private int status;
// 执行结果
private String result;
// 耗时
private Long usedTime;
+ // 预计触发时间
+ private Long expectedTriggerTime;
+ // 实际触发时间
+ private Long actualTriggerTime;
private Date gmtCreate;
private Date gmtModified;
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java
index 08065daa..5297441f 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java
@@ -14,7 +14,7 @@ import java.util.Date;
*/
@Data
@Entity
-@Table(name = "job_info")
+@Table(name = "job_info", indexes = {@Index(columnList = "appId")})
public class JobInfoDO {
@@ -29,6 +29,8 @@ public class JobInfoDO {
private String jobDescription;
// 任务所属的应用ID
private Long appId;
+ // 任务自带的参数
+ private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
@@ -45,7 +47,7 @@ public class JobInfoDO {
private String processorInfo;
/* ************************** 运行时配置 ************************** */
- // 最大同时运行任务数
+ // 最大同时运行任务数,默认 1
private Integer maxInstanceNum;
// 并发度,同时执行某个任务的最大线程数量
private Integer concurrency;
@@ -54,6 +56,10 @@ public class JobInfoDO {
// 任务的每一个Task超时时间
private Long taskTimeLimit;
+ /* ************************** 重试配置 ************************** */
+ private Integer instanceRetryNum;
+ private Integer taskRetryNum;
+
// 1 正常运行,2 停止(不再调度)
private Integer status;
// 下一次调度时间
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
index ac4bc0ae..8bd0472e 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
@@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
+import java.util.List;
+
/**
* AppInfo 数据访问层
*
@@ -12,4 +14,10 @@ import org.springframework.data.jpa.repository.JpaRepository;
public interface AppInfoRepository extends JpaRepository {
AppInfoDO findByAppName(String appName);
+
+ /**
+ * 根据 currentServer 查询 appId
+ * 其实只需要 id,处于性能考虑可以直接写SQL只返回ID
+ */
+ List findAllByCurrentServer(String currentServer);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java
new file mode 100644
index 00000000..84acd089
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java
@@ -0,0 +1,23 @@
+package com.github.kfcfans.oms.server.persistence.repository;
+
+import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+
+import java.util.List;
+
+/**
+ * JobLog 数据访问层
+ *
+ * @author tjq
+ * @since 2020/4/1
+ */
+public interface ExecuteLogRepository extends JpaRepository {
+
+ long countByJobIdAndStatusIn(long jobId, List status);
+
+ @Query(value = "update execute_log set status = ?2, result = ?3 where instance_id = ?1", nativeQuery = true)
+ int updateStatusAndLog(long instanceId, int status, String result);
+
+ List findByJobIdIn(List jobIds);
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java
index 1d011e58..00f73962 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java
@@ -14,7 +14,7 @@ import java.util.List;
public interface JobInfoRepository extends JpaRepository {
- List findByAppIdInAndNextTriggerTimeLessThanEqual(List appIds, Long time);
+ List findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time);
- List findByAppIdAndNextTriggerTimeLessThanEqual(Long appId, Long time);
+ List findByAppIdInAndStatusAndTimeExpression(List appIds, int status, int timeExpressionType);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java
deleted file mode 100644
index f0dd2cdc..00000000
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobLogRepository.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.github.kfcfans.oms.server.persistence.repository;
-
-import com.github.kfcfans.oms.server.persistence.model.JobLogDO;
-import org.springframework.data.jpa.repository.JpaRepository;
-
-/**
- * JobLog 数据访问层
- *
- * @author tjq
- * @since 2020/4/1
- */
-public interface JobLogRepository extends JpaRepository {
-
- long countByJobIdAndStatus(Long jobId, Integer status);
-
-}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java
index ebc3345d..c20bbb3b 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java
@@ -1,10 +1,23 @@
package com.github.kfcfans.oms.server.service;
+import akka.actor.ActorSelection;
+import com.github.kfcfans.common.ExecuteType;
+import com.github.kfcfans.common.ProcessorType;
+import com.github.kfcfans.common.request.ServerScheduleJobReq;
+import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
-import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
+import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
+import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.util.List;
+
+import static com.github.kfcfans.common.InstanceStatus.*;
+
/**
* 派送服务
@@ -12,16 +25,72 @@ import javax.annotation.Resource;
* @author tjq
* @since 2020/4/5
*/
+@Slf4j
@Service
public class DispatchService {
@Resource
- private JobLogRepository jobLogRepository;
+ private ExecuteLogRepository executeLogRepository;
- public void dispatch(JobInfoDO jobInfo) {
+ // 前三个状态都视为运行中
+ private static final List runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV());
- // 1. 查询当前运行的实例数
+ private static final String FAILED_REASON = "%d instance is running";
+ private static final String NO_WORKER_REASON = "no worker available";
+ private static final String EMPTY_RESULT = "";
+ public void dispatch(JobInfoDO jobInfo, long instanceId) {
+ log.debug("[DispatchService] start to dispatch job -> {}.", jobInfo);
+
+ // 查询当前运行的实例数
+ long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus);
+
+ // 超出最大同时运行限制,不执行调度
+ if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
+ String result = String.format(FAILED_REASON, runningInstanceCount);
+ log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount);
+ executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), result);
+
+ return;
+ }
+
+ // 获取 Worker
+ String taskTrackerAddress = WorkerManagerService.chooseBestWorker(jobInfo.getAppId());
+ List allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId());
+
+ if (StringUtils.isEmpty(taskTrackerAddress)) {
+ log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo);
+ executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), NO_WORKER_REASON);
+ return;
+ }
+
+ // 消除非原子操作带来的潜在不一致
+ allAvailableWorker.remove(taskTrackerAddress);
+ allAvailableWorker.add(taskTrackerAddress);
+
+ // 构造请求
+ ServerScheduleJobReq req = new ServerScheduleJobReq();
+ req.setAllWorkerAddress(allAvailableWorker);
+ req.setJobId(jobInfo.getId());
+ req.setInstanceId(instanceId);
+
+ req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name());
+ req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name());
+ req.setProcessorInfo(jobInfo.getProcessorInfo());
+
+ req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit());
+ req.setTaskTimeoutMS(jobInfo.getTaskTimeLimit());
+
+ req.setJobParams(jobInfo.getJobParams());
+ req.setThreadConcurrency(jobInfo.getConcurrency());
+ req.setTaskRetryNum(jobInfo.getTaskRetryNum());
+
+ // 发送请求(不可靠,需要一个后台线程定期轮询状态)
+ ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
+ taskTrackerActor.tell(req, null);
+
+ // 修改状态
+ executeLogRepository.updateStatusAndLog(instanceId, WAITING_WORKER_RECEIVE.getV(), EMPTY_RESULT);
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java
new file mode 100644
index 00000000..67ffc416
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/IdGenerateService.java
@@ -0,0 +1,16 @@
+package com.github.kfcfans.oms.server.service;
+
+/**
+ * 唯一ID生成服务
+ *
+ * @author tjq
+ * @since 2020/4/6
+ */
+public class IdGenerateService {
+
+ public static Long allocate() {
+ // TODO:换成合适的分布式ID生成算法
+ return System.currentTimeMillis();
+ }
+
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java
index 88680757..0f05ccb4 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java
@@ -4,10 +4,12 @@ import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* 管理Worker集群状态
@@ -61,14 +63,40 @@ public class ClusterStatusHolder {
entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore());
for (Map.Entry entry : address2Metrics.entrySet()) {
- long lastActiveTime = address2ActiveTime.getOrDefault(entry.getKey(), -1L);
- long timeout = System.currentTimeMillis() - lastActiveTime;
- if (timeout < WORKER_TIMEOUT_MS) {
- return entry.getKey();
+ String address = entry.getKey();
+ if (available(address)) {
+ return address;
}
}
log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics);
return null;
}
+
+ /**
+ * 获取当前所有可用的 Worker
+ * @return List
+ */
+ public List getAllAvailableWorker() {
+ List workers = Lists.newLinkedList();
+
+ address2Metrics.forEach((address, ignore) -> {
+ if (available(address)) {
+ workers.add(address);
+ }
+ });
+
+ return workers;
+ }
+
+ private boolean available(String address) {
+ SystemMetrics metrics = address2Metrics.get(address);
+ if (metrics.calculateScore() == SystemMetrics.MIN_SCORE) {
+ return false;
+ }
+
+ Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
+ long timeout = System.currentTimeMillis() - lastActiveTime;
+ return timeout < WORKER_TIMEOUT_MS;
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
index e02fdcbb..004baa42 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
@@ -72,10 +72,7 @@ public class ServerSelectService {
try {
// 可能上一台机器已经完成了Server选举,需要再次判断
- AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> {
- log.error("[ServerSelectService] impossible, unless we just lost our database.");
- return null;
- });
+ AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer())) {
return appInfo.getCurrentServer();
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java
index 963ad96b..ba818f52 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java
@@ -3,10 +3,10 @@ package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Worker 管理服务
@@ -17,7 +17,8 @@ import java.util.Map;
@Slf4j
public class WorkerManagerService {
- private static final Map appName2ClusterStatus = Maps.newConcurrentMap();
+ // 存储Worker健康信息,appId -> ClusterStatusHolder
+ private static final Map appId2ClusterStatus = Maps.newConcurrentMap();
/**
* 更新状态
@@ -26,7 +27,7 @@ public class WorkerManagerService {
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
- ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
+ ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
@@ -36,7 +37,7 @@ public class WorkerManagerService {
* @return Worker的地址(null代表没有可用的Worker)
*/
public static String chooseBestWorker(Long appId) {
- ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.get(appId);
+ ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
return null;
@@ -45,11 +46,24 @@ public class WorkerManagerService {
}
/**
- * 获取当前该 Server 管理的所有应用ID
- * @return List
+ * 获取当前所有可用的Worker地址
*/
- public static List listAppIds() {
- return Lists.newArrayList(appName2ClusterStatus.keySet());
+ public static List getAllAvailableWorker(Long appId) {
+ ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
+ if (clusterStatusHolder == null) {
+ log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
+ return Collections.emptyList();
+ }
+ return clusterStatusHolder.getAllAvailableWorker();
+ }
+
+ /**
+ * 清理不需要的worker信息
+ * @param usingAppIds 需要维护的appId,其余的数据将被删除
+ */
+ public static void clean(List usingAppIds) {
+ Set keys = Sets.newHashSet(usingAppIds);
+ appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java
index a29247a7..a9692233 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/schedule/JobScheduleService.java
@@ -1,19 +1,34 @@
package com.github.kfcfans.oms.server.service.schedule;
-import com.github.kfcfans.common.utils.CommonUtils;
+import com.github.kfcfans.common.InstanceStatus;
+import com.github.kfcfans.oms.server.common.constans.JobStatus;
+import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
+import com.github.kfcfans.oms.server.common.utils.CronExpression;
+import com.github.kfcfans.oms.server.core.akka.OhMyServer;
+import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
+import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
+import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
-import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
+import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
+import com.github.kfcfans.oms.server.service.DispatchService;
+import com.github.kfcfans.oms.server.service.IdGenerateService;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
-import com.github.kfcfans.oms.server.service.lock.LockService;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
+import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@@ -29,46 +44,163 @@ public class JobScheduleService {
private static final int MAX_BATCH_NUM = 10;
@Resource
- private LockService lockService;
+ private DispatchService dispatchService;
+
+ @Resource
+ private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
- private JobLogRepository jobLogRepository;
+ private ExecuteLogRepository executeLogRepository;
- private static final String SCHEDULE_LOCK = "schedule_lock_%d";
- private static final long SCHEDULE_RATE = 10000;
+ private static final long SCHEDULE_RATE = 5000;
@Scheduled(fixedRate = SCHEDULE_RATE)
- private void getJob() {
- List allAppIds = WorkerManagerService.listAppIds();
- if (CollectionUtils.isEmpty(allAppIds)) {
+ public void timingSchedule() {
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ // 先查询DB,查看本机需要负责的任务
+ List allAppInfos = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress());
+ if (CollectionUtils.isEmpty(allAppInfos)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;
}
+ List allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
- long timeThreshold = System.currentTimeMillis() + 2 * SCHEDULE_RATE;
- Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
+ try {
+ scheduleCornJob(allAppIds);
+ }catch (Exception e) {
+ log.error("[JobScheduleService] schedule cron job failed.", e);
+ }
+ log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch);
+ stopwatch.reset();
- List lockNames = partAppIds.stream().map(JobScheduleService::genLock).collect(Collectors.toList());
- // 1. 先批量获取锁,获取不到就改成单个循环模式
- boolean batchLock = lockService.batchLock(lockNames);
- if (!batchLock) {
+ try {
+ scheduleFrequentJob(allAppIds);
+ }catch (Exception e) {
+ log.error("[JobScheduleService] schedule frequent job failed.", e);
+ }
+ log.info("[JobScheduleService] finished frequent schedule, using time {}.", stopwatch);
+ stopwatch.stop();
+ }
- }else {
- try {
- List jobInfos = jobInfoRepository.findByAppIdInAndNextTriggerTimeLessThanEqual(partAppIds, timeThreshold);
+ /**
+ * 调度 CRON 表达式类型的任务
+ */
+ private void scheduleCornJob(List appIds) {
- // 顺序:先推入进时间轮 -> 写jobLog表 -> 更新nextTriggerTime(原则:宁可重复执行,也不能不调度)
+ // 清理不需要维护的数据
+ WorkerManagerService.clean(appIds);
+
+ long nowTime = System.currentTimeMillis();
+ long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
+ Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
+
+ try {
+
+ // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
+ List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
+
+ // 1. 批量写日志表
+ Map jobId2InstanceId = Maps.newHashMap();
+ List executeLogs = Lists.newLinkedList();
+ jobInfos.forEach(jobInfoDO -> {
+
+ ExecuteLogDO executeLog = new ExecuteLogDO();
+ executeLog.setJobId(jobInfoDO.getId());
+ executeLog.setInstanceId(IdGenerateService.allocate());
+ executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
+ executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime());
+ executeLog.setGmtCreate(new Date());
+ executeLog.setGmtModified(executeLog.getGmtCreate());
+
+ executeLogs.add(executeLog);
+
+ jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId());
+ });
+ executeLogRepository.saveAll(executeLogs);
+ executeLogRepository.flush();
+
+ // 2. 推入时间轮中等待调度执行
+ jobInfos.forEach(jobInfoDO -> {
+
+ long targetTriggerTime = jobInfoDO.getNextTriggerTime();
+ long delay = 0;
+ if (targetTriggerTime < nowTime) {
+ log.warn("[JobScheduleService] Job({}) was delayed.", jobInfoDO);
+ }else {
+ delay = targetTriggerTime - nowTime;
+ }
+
+ HashedWheelTimerHolder.TIMER.schedule(() -> {
+ dispatchService.dispatch(jobInfoDO, jobId2InstanceId.get(jobInfoDO.getId()));
+ }, delay, TimeUnit.MILLISECONDS);
+
+ });
+
+ // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
+ Date now = new Date();
+ List updatedJobInfos = Lists.newLinkedList();
+ jobInfos.forEach(jobInfoDO -> {
+
+ try {
+ CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
+ Date nextTriggerTime = cronExpression.getNextValidTimeAfter(now);
+
+ JobInfoDO updatedJobInfo = new JobInfoDO();
+ BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
+ updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
+ updatedJobInfo.setGmtModified(now);
+
+ updatedJobInfos.add(updatedJobInfo);
+ } catch (Exception e) {
+ log.error("[JobScheduleService] calculate next trigger time for job({}) failed.", jobInfoDO, e);
+ }
+ });
+ jobInfoRepository.saveAll(updatedJobInfos);
+ jobInfoRepository.flush();
- }catch (Exception e) {
-
- }
+ }catch (Exception e) {
+ log.error("[JobScheduleService] schedule job failed.", e);
}
});
}
- private static String genLock(Long appId) {
- return String.format(SCHEDULE_LOCK, appId);
+ /**
+ * 调度 FIX_RATE 和 FIX_DELAY 的任务
+ */
+ private void scheduleFrequentJob(List appIds) {
+
+ List fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV());
+ List fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV());
+
+ List jobIds = Lists.newLinkedList();
+ Map jobId2JobInfo = Maps.newHashMap();
+ Consumer consumer = jobInfo -> {
+ jobIds.add(jobInfo.getId());
+ jobId2JobInfo.put(jobInfo.getId(), jobInfo);
+ };
+ fixDelayJobs.forEach(consumer);
+ fixRateJobs.forEach(consumer);
+
+ if (CollectionUtils.isEmpty(jobIds)) {
+ log.debug("[JobScheduleService] no frequent job need to schedule.");
+ return;
+ }
+
+ // 查询 ExecuteLog 表,不存在或非运行状态则重新调度
+ List executeLogDOS = executeLogRepository.findByJobIdIn(jobIds);
+ executeLogDOS.forEach(executeLogDO -> {
+ if (executeLogDO.getStatus() == InstanceStatus.RUNNING.getV()) {
+ jobId2JobInfo.remove(executeLogDO.getJobId());
+ }
+ });
+
+ // 重新Dispatch
+ jobId2JobInfo.values().forEach(jobInfoDO -> {
+
+ });
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java
index a0ce3dce..9dcb503a 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java
@@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
+import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.web.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
@@ -14,6 +15,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
+import java.util.Date;
/**
* 任务信息管理 Controller
@@ -29,15 +31,22 @@ public class JobController {
private JobInfoRepository jobInfoRepository;
@PostMapping("/save")
- public ResultDTO saveJobInfo(ModifyJobInfoRequest request) {
+ public ResultDTO saveJobInfo(ModifyJobInfoRequest request) throws Exception {
JobInfoDO jobInfoDO = new JobInfoDO();
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
+ TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpression());
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
- jobInfoDO.setTimeExpressionType(TimeExpressionType.valueOf(request.getTimeExpression()).getV());
+ jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
+ // 计算下次调度时间
+ if (timeExpressionType == TimeExpressionType.CRON) {
+ CronExpression cronExpression = new CronExpression(request.getTimeExpression());
+ Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
+ jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
+ }
jobInfoRepository.saveAndFlush(jobInfoDO);
return ResultDTO.success(null);
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java
index 8499d265..42ffd3d9 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java
@@ -11,6 +11,8 @@ import lombok.Data;
@Data
public class ModifyJobInfoRequest {
+ // null -> 插入,否则为更新
+ private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
private String jobName;
@@ -20,6 +22,8 @@ public class ModifyJobInfoRequest {
private Long appId;
// 任务分组名称(仅用于前端展示的分组)
private String groupName;
+ // 任务自带的参数
+ private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
@@ -36,6 +40,7 @@ public class ModifyJobInfoRequest {
// 执行器信息
private String processorInfo;
+
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
private Integer maxInstanceNum;
@@ -45,4 +50,11 @@ public class ModifyJobInfoRequest {
private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private Long taskTimeLimit;
+
+ /* ************************** 重试配置 ************************** */
+ private Integer instanceRetryNum;
+ private Integer taskRetryNum;
+
+ // 1 正常运行,2 停止(不再调度)
+ private Integer status;
}
diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java
index eeb6dcfc..616cf745 100644
--- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java
+++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java
@@ -61,4 +61,9 @@ public class UtilsTest {
Thread.sleep(277777777);
}
+ @Test
+ public void testCronExpression() {
+
+ }
+
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
index 440324d3..871569ba 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java
@@ -30,8 +30,8 @@ public class ProcessorTrackerActor extends AbstractActor {
* 处理来自TaskTracker的task执行请求
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
- String jobId = req.getInstanceInfo().getJobId();
- String instanceId = req.getInstanceInfo().getInstanceId();
+ Long jobId = req.getInstanceInfo().getJobId();
+ Long instanceId = req.getInstanceInfo().getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> {
ProcessorTracker pt = new ProcessorTracker(req);
log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId);
@@ -50,7 +50,7 @@ public class ProcessorTrackerActor extends AbstractActor {
private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {
- String instanceId = req.getInstanceId();
+ Long instanceId = req.getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId);
if (processorTracker == null) {
log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
index 1dde9355..9d314592 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java
@@ -126,7 +126,7 @@ public class TaskTrackerActor extends AbstractActor {
* 服务器任务调度处理器
*/
private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
- String instanceId = req.getInstanceId();
+ Long instanceId = req.getInstanceId();
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId);
if (taskTracker != null) {
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java
deleted file mode 100644
index 41ee4427..00000000
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.github.kfcfans.oms.worker.common.constants;
-
-import com.google.common.base.Splitter;
-
-/**
- * splitter & joiner
- *
- * @author tjq
- * @since 2020/3/17
- */
-public class CommonSJ {
- public static final Splitter commaSplitter = Splitter.on(",");
-}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
index 7d2d5995..38d668d8 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
@@ -44,7 +44,7 @@ public class ProcessorRunnable implements Runnable {
public void innerRun() {
String taskId = task.getTaskId();
- String instanceId = task.getInstanceId();
+ Long instanceId = task.getInstanceId();
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java
index 24f5fa37..c252f4a6 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatusHolder.java
@@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.core.ha;
-import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -18,12 +17,10 @@ public class ProcessorTrackerStatusHolder {
private final Map ip2Status;
- public ProcessorTrackerStatusHolder(String allWorkerAddress) {
+ public ProcessorTrackerStatusHolder(List allWorkerAddress) {
ip2Status = Maps.newConcurrentMap();
-
- List addressList = CommonSJ.commaSplitter.splitToList(allWorkerAddress);
- addressList.forEach(ip -> {
+ allWorkerAddress.forEach(ip -> {
ProcessorTrackerStatus pts = new ProcessorTrackerStatus();
pts.init(ip);
ip2Status.put(ip, pts);
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java
index e5d59b6d..4e25c7c2 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java
@@ -33,7 +33,7 @@ public class ProcessorTracker {
// 任务实例信息
private InstanceInfo instanceInfo;
// 冗余 instanceId,方便日志
- private String instanceId;
+ private Long instanceId;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java
index cc6d6bf5..e8b047c2 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java
@@ -14,23 +14,23 @@ import java.util.function.Function;
*/
public class ProcessorTrackerPool {
- private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap();
+ private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap();
/**
* 获取 ProcessorTracker,如果不存在则创建
*/
- public static ProcessorTracker getProcessorTracker(String instanceId, Function creator) {
+ public static ProcessorTracker getProcessorTracker(Long instanceId, Function creator) {
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
}
/**
* 获取 ProcessorTracker
*/
- public static ProcessorTracker getProcessorTracker(String instanceId) {
+ public static ProcessorTracker getProcessorTracker(Long instanceId) {
return instanceId2ProcessorTracker.get(instanceId);
}
- public static void removeProcessorTracker(String instanceId) {
+ public static void removeProcessorTracker(Long instanceId) {
instanceId2ProcessorTracker.remove(instanceId);
}
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java
index 96a39e28..b35e56f6 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java
@@ -92,7 +92,7 @@ public class TaskTracker {
* 更新任务状态
* 任务状态机只允许数字递增
*/
- public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result) {
+ public void updateTaskStatus(Long instanceId, String taskId, int newStatus, @Nullable String result) {
boolean updateResult;
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
@@ -217,7 +217,7 @@ public class TaskTracker {
CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow());
// 1. 通知 ProcessorTracker 释放资源
- String instanceId = instanceInfo.getInstanceId();
+ Long instanceId = instanceInfo.getInstanceId();
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
@@ -257,7 +257,7 @@ public class TaskTracker {
}
Stopwatch stopwatch = Stopwatch.createStarted();
- String instanceId = instanceInfo.getInstanceId();
+ Long instanceId = instanceInfo.getInstanceId();
// 1. 获取可以派发任务的 ProcessorTracker
List availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();
@@ -323,7 +323,7 @@ public class TaskTracker {
private void innerRun() {
- final String instanceId = instanceInfo.getInstanceId();
+ Long instanceId = instanceInfo.getInstanceId();
// 1. 查询统计信息
Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
@@ -396,7 +396,7 @@ public class TaskTracker {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
req.setResult(resultTask.getResult());
- req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getValue() : InstanceStatus.FAILED.getValue());
+ req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage