start to develop the HashWheel

This commit is contained in:
tjq 2020-04-02 17:11:43 +08:00
parent 02de19357c
commit 9130b8d4f4
26 changed files with 233 additions and 19 deletions

View File

@ -26,7 +26,7 @@ public class RemoteConstant {
/* ************************ AKKA SERVER ************************ */
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
public static final String SERVER_ACTOR_NAME = "server_actor";
public static final String SERVER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
public static final String SERVER_AKKA_CONFIG_NAME = "oms-server.akka.conf";
/* ************************ OTHERS ************************ */

View File

@ -31,7 +31,6 @@ public class CommonUtils {
try {
return executor.get();
}catch (Exception e) {
log.warn("[CommonUtils] executeWithRetry failed, system will retry after {}ms.", intervalMS, e);
Thread.sleep(intervalMS);
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.common.utils;
package com.github.kfcfans.common.utils;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.config;
package com.github.kfcfans.oms.server.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -0,0 +1,44 @@
package com.github.kfcfans.oms.server.core.actors;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* 服务端 ActorSystem 启动器
*
* @author tjq
* @since 2020/4/2
*/
@Slf4j
public class OhMyServer {
public static ActorSystem actorSystem;
private static String actorSystemAddress;
public void init() {
// 1. 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
String localIP = NetUtils.getLocalHost();
int port = NetUtils.getAvailablePort();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
overrideConfig.put("akka.remote.artery.canonical.port", port);
actorSystemAddress = localIP + ":" + port;
log.info("[OhMyWorker] akka-remote server address: {}", actorSystemAddress);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.actors;
package com.github.kfcfans.oms.server.core.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.WorkerHeartbeat;

View File

@ -45,7 +45,9 @@ public class JobInfoDO {
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 并发度同时执行的线程数量
// 最大同时运行任务数
private Integer maxInstanceNum;
// 并发度同时执行某个任务的最大线程数量
private Integer concurrency;
// 任务整体超时时间
private Long instanceTimeLimit;

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.oms.server.persistence.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
/**
* 数据库锁
*
* @author tjq
* @since 2020/4/2
*/
@Data
@Entity
@NoArgsConstructor
@Table(name = "oms_lock", uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
public class OmsLockDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String lockName;
private String owner;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String owner) {
this.lockName = lockName;
this.owner = owner;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
}

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
/**
* 利用唯一性约束作为数据库锁
*
* @author tjq
* @since 2020/4/2
*/
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName);
}

View File

@ -0,0 +1,24 @@
package com.github.kfcfans.oms.server.service;
/**
* 锁服务
*
* @author tjq
* @since 2020/4/2
*/
public interface LockService {
/**
* 上锁获取锁立即返回不会阻塞等待锁
* @param name 锁名称
* @return true -> 获取到锁false -> 未获取到锁
*/
boolean lock(String name);
/**
* 释放锁
* @param name 锁名称
*/
void unlock(String name);
}

View File

@ -0,0 +1,52 @@
package com.github.kfcfans.oms.server.service.impl;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import com.github.kfcfans.oms.server.service.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 基于数据库实现分布式锁
*
* @author tjq
* @since 2020/4/2
*/
@Slf4j
@Service
public class DatabaseLockService implements LockService {
@Resource
private OmsLockRepository omsLockRepository;
@Override
public boolean lock(String name) {
try {
OmsLockDO lock = new OmsLockDO(name, NetUtils.getLocalHost());
omsLockRepository.saveAndFlush(lock);
return true;
}catch (DataIntegrityViolationException ignore) {
log.info("[DatabaseLockService] other thread get the lock {}.", name);
} catch (Exception e) {
log.error("[DatabaseLockService] lock {} failed.", name, e);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.server.web;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.pojo;
package com.github.kfcfans.oms.server.web;
import lombok.Getter;
import lombok.Setter;

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import com.github.kfcfans.oms.server.web.ResultDTO;
import com.github.kfcfans.oms.server.web.request.ModifyAppInfoRequest;
import lombok.Data;
import org.springframework.beans.BeanUtils;

View File

@ -4,7 +4,7 @@ import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import com.github.kfcfans.oms.server.web.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import org.springframework.beans.BeanUtils;

View File

@ -37,6 +37,8 @@ public class ModifyJobInfoRequest {
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
private Integer maxInstanceNum;
// 并发度同时执行的线程数量
private Integer concurrency;
// 任务整体超时时间

View File

@ -10,7 +10,7 @@ akka {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 10086
canonical.port = 0
}
}
}

View File

@ -0,0 +1,33 @@
package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.oms.server.service.LockService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* 服务测试
*
* @author tjq
* @since 2020/4/2
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ServiceTest {
@Resource
private LockService lockService;
@Test
public void testLockService() {
String lockName = "myLock";
lockService.lock(lockName);
lockService.lock(lockName);
lockService.unlock(lockName);
}
}

View File

@ -7,7 +7,7 @@ import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;

View File

@ -13,7 +13,7 @@ import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.core.ha.ProcessorTrackerStatusHolder;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import lombok.Data;
import lombok.NoArgsConstructor;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import lombok.Getter;

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.collect.Lists;

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;

View File

@ -9,7 +9,7 @@ import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
import org.junit.jupiter.api.Test;