replace IP to Address in Worker,so User need't to specify the port

This commit is contained in:
tjq 2020-04-06 22:43:14 +08:00
parent a49d2e7e3a
commit b69cb6fc14
26 changed files with 155 additions and 121 deletions

View File

@ -53,13 +53,6 @@
<version>${akka.version}</version> <version>${akka.version}</version>
</dependency> </dependency>
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<!-- SpringBoot --> <!-- SpringBoot -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server;
import com.github.kfcfans.oms.server.core.akka.OhMyServer; import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/** /**
* SpringBoot 启动入口 * SpringBoot 启动入口
@ -10,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @author tjq * @author tjq
* @since 2020/3/29 * @since 2020/3/29
*/ */
@EnableScheduling
@SpringBootApplication @SpringBootApplication
public class OhMyApplication { public class OhMyApplication {

View File

@ -1,36 +0,0 @@
package com.github.kfcfans.oms.server.common.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ZooKeeper 连接配置
*
* @author tjq
* @since 2020/4/4
*/
@Configuration
public class CuratorConfig {
@Value("${zookeeper.address}")
private String zkAddress;
@Bean("omsCurator")
public CuratorFramework initCurator() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.namespace("oms")
// zookeeper 地址多值用 , 分割即可
.connectString(zkAddress)
.sessionTimeoutMs(1000)
.connectionTimeoutMs(1000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
return client;
}
}

View File

@ -13,7 +13,7 @@ import java.util.Date;
*/ */
@Data @Data
@Entity @Entity
@Table(name = "app_info") @Table(name = "app_info", uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})})
public class AppInfoDO { public class AppInfoDO {
@Id @Id

View File

@ -14,7 +14,7 @@ import java.util.List;
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> { public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time); List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpression(List<Long> appIds, int status, int timeExpressionType); List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionType(List<Long> appIds, int status, int timeExpressionType);
} }

View File

@ -121,7 +121,7 @@ public class ServerSelectService {
downServerCache.remove(serverAddress); downServerCache.remove(serverAddress);
return response.isSuccess(); return response.isSuccess();
}catch (Exception e) { }catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down, try to elect a new server.", serverAddress); log.warn("[ServerSelectService] server({}) was down, I will be the new server.", serverAddress);
} }
downServerCache.add(serverAddress); downServerCache.add(serverAddress);
return false; return false;

View File

@ -74,7 +74,7 @@ public class JobScheduleService {
log.error("[JobScheduleService] schedule cron job failed.", e); log.error("[JobScheduleService] schedule cron job failed.", e);
} }
log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch); log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch);
stopwatch.reset(); stopwatch.reset().start();
try { try {
scheduleFrequentJob(allAppIds); scheduleFrequentJob(allAppIds);
@ -100,10 +100,17 @@ public class JobScheduleService {
try { try {
// 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 // 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
if (CollectionUtils.isEmpty(jobInfos)) {
log.debug("[JobScheduleService] no cron job need to schedule");
return;
}
// 1. 批量写日志表 // 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap(); Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
log.info("[JobScheduleService] try to schedule some cron jobs, their jobId is {}.", jobId2InstanceId.keySet());
List<ExecuteLogDO> executeLogs = Lists.newLinkedList(); List<ExecuteLogDO> executeLogs = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> { jobInfos.forEach(jobInfoDO -> {
@ -173,8 +180,8 @@ public class JobScheduleService {
*/ */
private void scheduleFrequentJob(List<Long> appIds) { private void scheduleFrequentJob(List<Long> appIds) {
List<JobInfoDO> fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV()); List<JobInfoDO> fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV());
List<JobInfoDO> fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV()); List<JobInfoDO> fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV());
List<Long> jobIds = Lists.newLinkedList(); List<Long> jobIds = Lists.newLinkedList();
Map<Long, JobInfoDO> jobId2JobInfo = Maps.newHashMap(); Map<Long, JobInfoDO> jobId2JobInfo = Maps.newHashMap();

View File

@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -32,6 +33,11 @@ public class AppInfoController {
AppInfoDO appInfoDO = new AppInfoDO(); AppInfoDO appInfoDO = new AppInfoDO();
BeanUtils.copyProperties(appInfoRequest, appInfoDO); BeanUtils.copyProperties(appInfoRequest, appInfoDO);
Date now = new Date();
if (appInfoRequest.getId() == null) {
appInfoDO.setGmtCreate(now);
}
appInfoDO.setGmtModified(now);
appInfoRepository.saveAndFlush(appInfoDO); appInfoRepository.saveAndFlush(appInfoDO);
return ResultDTO.success(null); return ResultDTO.success(null);
} }

View File

@ -37,17 +37,22 @@ public class JobController {
BeanUtils.copyProperties(request, jobInfoDO); BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值 // 拷贝枚举值
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpression()); TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpressionType());
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV()); jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV()); jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(timeExpressionType.getV()); jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
// 计算下次调度时间 // 计算下次调度时间
Date now = new Date();
if (timeExpressionType == TimeExpressionType.CRON) { if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(request.getTimeExpression()); CronExpression cronExpression = new CronExpression(request.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
} }
if (request.getId() == null) {
jobInfoDO.setGmtCreate(now);
}
jobInfoDO.setGmtModified(now);
jobInfoRepository.saveAndFlush(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO);
return ResultDTO.success(null); return ResultDTO.success(null);
} }

View File

@ -4,6 +4,7 @@ import lombok.Data;
/** /**
* 创建/修改 JobInfo 请求 * 创建/修改 JobInfo 请求
* 测试用例快速复制区域MAP_REDUCEEMBEDDED_JAVACRONcom.github.kfcfans.oms.processors.TestMapReduceProcessor
* *
* @author tjq * @author tjq
* @since 2020/3/30 * @since 2020/3/30

View File

@ -11,7 +11,4 @@ spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.minimum-idle=5
# JPA 相关配置 # JPA 相关配置
spring.jpa.show-sql=true spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update spring.jpa.hibernate.ddl-auto=update
# ZooKeeper(多值逗号分割)
zookeeper.address=115.159.215.229:2181

View File

@ -1,7 +1,11 @@
package com.github.kfcfans.oms.server.test; package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
import org.assertj.core.util.Lists; import org.assertj.core.util.Lists;
import org.junit.Test; import org.junit.Test;
@ -22,6 +26,8 @@ import java.util.List;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class RepositoryTest { public class RepositoryTest {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource @Resource
private OmsLockRepository omsLockRepository; private OmsLockRepository omsLockRepository;
@ -40,4 +46,10 @@ public class RepositoryTest {
omsLockRepository.flush(); omsLockRepository.flush();
} }
@Test
public void testSelectCronJobSQL() {
List<JobInfoDO> result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis());
System.out.println(result);
}
} }

View File

@ -1,11 +1,13 @@
package com.github.kfcfans.oms.server.test; package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer; import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.oms.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.oms.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.oms.server.common.utils.timewheel.TimerTask; import com.github.kfcfans.oms.server.common.utils.timewheel.TimerTask;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.Test; import org.junit.Test;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,8 +64,11 @@ public class UtilsTest {
} }
@Test @Test
public void testCronExpression() { public void testCronExpression() throws Exception {
String cron = "0 * * * * ? *";
CronExpression cronExpression = new CronExpression(cron);
final Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(new Date());
System.out.println(nextValidTimeAfter);
} }
} }

View File

@ -5,9 +5,12 @@ import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils; import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Map;
/** /**
* 服务发现 * 服务发现
* *
@ -17,32 +20,58 @@ import org.springframework.util.StringUtils;
@Slf4j @Slf4j
public class ServerDiscoveryService { public class ServerDiscoveryService {
private static final Map<String, String> IP2ADDRESS = Maps.newHashMap();
private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s"; private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s";
@SuppressWarnings("rawtypes")
public static String discovery() { public static String discovery() {
if (IP2ADDRESS.isEmpty()) {
OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":")[0], x));
}
String result = null; String result = null;
// 先对当前机器发起请求
String currentServer = OhMyWorker.getCurrentServer();
if (!StringUtils.isEmpty(currentServer)) {
String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担
String firstServerAddress = IP2ADDRESS.get(ip);
result = acquire(firstServerAddress);
}
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) { for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
if (StringUtils.isEmpty(result)) {
String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer()); result = acquire(httpServerAddress);
try { }else {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); break;
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
String server = resultDTO.getData().toString();
log.debug("[OMS-ServerDiscoveryService] current server is {}.", server);
return server;
}
} }
} }
log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
return null; if (StringUtils.isEmpty(result)) {
log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
return null;
}else {
log.debug("[OMS-ServerDiscoveryService] current server is {}.", result);
return result;
}
} }
@SuppressWarnings("rawtypes")
private static String acquire(String httpServerAddress) {
String result = null;
String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer());
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
}
return null;
}
} }

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/** /**
* Worker健康度定时上报 * Worker健康度定时上报
@ -23,8 +24,13 @@ public class WorkerHealthReportRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics(); // 没有可用Server无法上报
String currentServer = OhMyWorker.getCurrentServer();
if (StringUtils.isEmpty(currentServer)) {
return;
}
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
WorkerHeartbeat heartbeat = new WorkerHeartbeat(); WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics); heartbeat.setSystemMetrics(systemMetrics);
@ -34,7 +40,7 @@ public class WorkerHealthReportRunnable implements Runnable {
heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setHeartbeatTime(System.currentTimeMillis());
// 发送请求 // 发送请求
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath);
actorSelection.tell(heartbeat, null); actorSelection.tell(heartbeat, null);
} }

View File

@ -14,18 +14,14 @@ public class AkkaUtils {
/** /**
* akka://<actor system>@<hostname>:<port>/<actor path> * akka://<actor system>@<hostname>:<port>/<actor path>
*/ */
private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/user/%s"; private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
private static final String AKKA_SERVER_NODE_PATH = "akka://%s@%s/user/%s"; public static String getAkkaWorkerPath(String address, String actorName) {
return String.format(AKKA_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, address, actorName);
public static String getAkkaRemotePath(String ip, String actorName) {
Integer configPort = OhMyWorker.getConfig().getListeningPort();
int port = configPort == null ? RemoteConstant.DEFAULT_CLIENT_PORT : configPort;
return String.format(AKKA_REMOTE_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, ip, port, actorName);
} }
public static String getAkkaServerNodePath(String actorName) { public static String getAkkaServerPath(String actorName) {
return String.format(AKKA_SERVER_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
} }
} }

View File

@ -17,8 +17,8 @@ public class ProcessorTrackerStatus {
private static final int DISPATCH_THRESHOLD = 20; private static final int DISPATCH_THRESHOLD = 20;
private static final int HEARTBEAT_TIMEOUT_MS = 60000; private static final int HEARTBEAT_TIMEOUT_MS = 60000;
// 冗余存储一份 IP 地址 // 冗余存储一份 address 地址
private String ip; private String address;
// 上次活跃时间 // 上次活跃时间
private long lastActiveTime; private long lastActiveTime;
// 等待执行任务数 // 等待执行任务数
@ -31,8 +31,8 @@ public class ProcessorTrackerStatus {
/** /**
* 初始化 ProcessorTracker此时并未持有实际的 ProcessorTracker 状态 * 初始化 ProcessorTracker此时并未持有实际的 ProcessorTracker 状态
*/ */
public void init(String ptIP) { public void init(String address) {
this.ip = ptIP; this.address = address;
this.lastActiveTime = System.currentTimeMillis(); this.lastActiveTime = System.currentTimeMillis();
this.remainTaskNum = 0; this.remainTaskNum = 0;
this.dispatched = false; this.dispatched = false;
@ -50,7 +50,7 @@ public class ProcessorTrackerStatus {
return; return;
} }
this.ip = req.getIp(); this.address = req.getAddress();
this.lastActiveTime = req.getTime(); this.lastActiveTime = req.getTime();
this.remainTaskNum = req.getRemainTaskNum(); this.remainTaskNum = req.getRemainTaskNum();
this.dispatched = true; this.dispatched = true;

View File

@ -15,27 +15,28 @@ import java.util.Map;
*/ */
public class ProcessorTrackerStatusHolder { public class ProcessorTrackerStatusHolder {
private final Map<String, ProcessorTrackerStatus> ip2Status; // ProcessorTracker的address(IP:Port) -> 状态
private final Map<String, ProcessorTrackerStatus> address2Status;
public ProcessorTrackerStatusHolder(List<String> allWorkerAddress) { public ProcessorTrackerStatusHolder(List<String> allWorkerAddress) {
ip2Status = Maps.newConcurrentMap(); address2Status = Maps.newConcurrentMap();
allWorkerAddress.forEach(ip -> { allWorkerAddress.forEach(address -> {
ProcessorTrackerStatus pts = new ProcessorTrackerStatus(); ProcessorTrackerStatus pts = new ProcessorTrackerStatus();
pts.init(ip); pts.init(address);
ip2Status.put(ip, pts); address2Status.put(address, pts);
}); });
} }
public ProcessorTrackerStatus getProcessorTrackerStatus(String ip) { public ProcessorTrackerStatus getProcessorTrackerStatus(String address) {
return ip2Status.get(ip); return address2Status.get(address);
} }
/** /**
* 根据 ProcessorTracker 的心跳更新状态 * 根据 ProcessorTracker 的心跳更新状态
*/ */
public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) { public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) {
ProcessorTrackerStatus processorTrackerStatus = ip2Status.get(heartbeatReq.getIp()); ProcessorTrackerStatus processorTrackerStatus = address2Status.get(heartbeatReq.getAddress());
processorTrackerStatus.update(heartbeatReq); processorTrackerStatus.update(heartbeatReq);
} }
@ -45,9 +46,9 @@ public class ProcessorTrackerStatusHolder {
public List<String> getAvailableProcessorTrackers() { public List<String> getAvailableProcessorTrackers() {
List<String> result = Lists.newLinkedList(); List<String> result = Lists.newLinkedList();
ip2Status.forEach((ip, ptStatus) -> { address2Status.forEach((address, ptStatus) -> {
if (ptStatus.available()) { if (ptStatus.available()) {
result.add(ip); result.add(address);
} }
}); });
return result; return result;
@ -57,7 +58,7 @@ public class ProcessorTrackerStatusHolder {
* 获取所有 ProcessorTracker 的IP地址包括不可用状态 * 获取所有 ProcessorTracker 的IP地址包括不可用状态
*/ */
public List<String> getAllProcessorTrackers() { public List<String> getAllProcessorTrackers() {
return Lists.newArrayList(ip2Status.keySet()); return Lists.newArrayList(address2Status.keySet());
} }
/** /**
@ -66,7 +67,7 @@ public class ProcessorTrackerStatusHolder {
public List<String> getAllDisconnectedProcessorTrackers() { public List<String> getAllDisconnectedProcessorTrackers() {
List<String> result = Lists.newLinkedList(); List<String> result = Lists.newLinkedList();
ip2Status.forEach((ip, ptStatus) -> { address2Status.forEach((ip, ptStatus) -> {
if (ptStatus.isTimeout()) { if (ptStatus.isTimeout()) {
result.add(ip); result.add(ip);
} }

View File

@ -53,7 +53,7 @@ public class ProcessorTracker {
this.instanceInfo = request.getInstanceInfo(); this.instanceInfo = request.getInstanceInfo();
this.instanceId = request.getInstanceInfo().getInstanceId(); this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress(); this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
// 初始化 // 初始化

View File

@ -198,7 +198,7 @@ public class TaskTracker {
rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setInstanceId(instanceInfo.getInstanceId());
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID); rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
rootTask.setFailedCnt(0); rootTask.setFailedCnt(0);
rootTask.setAddress(NetUtils.getLocalHost()); rootTask.setAddress(OhMyWorker.getWorkerAddress());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setLastModifiedTime(System.currentTimeMillis()); rootTask.setLastModifiedTime(System.currentTimeMillis());
@ -221,7 +221,7 @@ public class TaskTracker {
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId); stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 // 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null); ptActor.tell(stopRequest, null);
@ -289,7 +289,7 @@ public class TaskTracker {
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} }
String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); String ptActorPath = AkkaUtils.getAkkaWorkerPath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null); ptActor.tell(startTaskReq, null);
@ -380,7 +380,7 @@ public class TaskTracker {
TaskDO newLastTask = new TaskDO(); TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(TaskConstant.LAST_TASK_ID); newLastTask.setTaskId(TaskConstant.LAST_TASK_ID);
newLastTask.setAddress(NetUtils.getLocalHost()); newLastTask.setAddress(OhMyWorker.getWorkerAddress());
addTask(Lists.newArrayList(newLastTask)); addTask(Lists.newArrayList(newLastTask));
} }
@ -388,7 +388,7 @@ public class TaskTracker {
finished.set(finishedBoolean); finished.set(finishedBoolean);
} }
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
// 3. 执行完毕报告服务器 // 3. 执行完毕报告服务器

View File

@ -25,7 +25,7 @@ public class TaskDO {
private String taskName; private String taskName;
// 任务对象序列化后的二进制数据 // 任务对象序列化后的二进制数据
private byte[] taskContent; private byte[] taskContent;
// 对于JobTracker为workerAddress对于普通Worker为jobTrackerAddress // 对于TaskTracker为workerAddress派发地址对于普通Worker为TaskTrackerAddress汇报地址
private String address; private String address;
// 任务状态010代表 JobTracker 使用1120代表普通Worker使用 // 任务状态010代表 JobTracker 使用1120代表普通Worker使用
private Integer status; private Integer status;

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.worker.pojo.request; package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -29,13 +30,13 @@ public class ProcessorTrackerStatusReportReq {
/** /**
* 本机地址 * 本机地址
*/ */
private String ip; private String address;
public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) { public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
this.instanceId = instanceId; this.instanceId = instanceId;
this.remainTaskNum = remainTaskNum; this.remainTaskNum = remainTaskNum;
this.time = System.currentTimeMillis(); this.time = System.currentTimeMillis();
this.ip = NetUtils.getLocalHost(); this.address = OhMyWorker.getWorkerAddress();
} }
} }

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.worker.pojo.request; package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import lombok.Getter; import lombok.Getter;
@ -31,9 +32,12 @@ public class TaskTrackerStartTaskReq implements Serializable {
private int taskCurrentRetryNums; private int taskCurrentRetryNums;
/**
* 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用
*/
public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) { public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) {
this.taskTrackerAddress = NetUtils.getLocalHost(); this.taskTrackerAddress = OhMyWorker.getWorkerAddress();
this.instanceInfo = instanceInfo; this.instanceInfo = instanceInfo;
this.taskId = task.getTaskId(); this.taskId = task.getTaskId();

View File

@ -57,7 +57,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常 // 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
boolean requestSucceed = false; boolean requestSucceed = false;
try { try {
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -36,7 +36,7 @@ public class ProcessorTrackerTest {
worker.init(); worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
remoteProcessorTracker = testAS.actorSelection(akkaRemotePath); remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
} }

View File

@ -36,21 +36,26 @@ public class TaskTrackerTest {
worker.init(); worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.Task_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":25520", RemoteConstant.Task_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath); remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
} }
@Test
public void justStartWorkerToTestServer() throws Exception {
Thread.sleep(277277277);
}
@Test @Test
public void testStandaloneJob() throws Exception { public void testStandaloneJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null); remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null);
Thread.sleep(500000); Thread.sleep(5000000);
} }
@Test @Test
public void testMapReduceJob() throws Exception { public void testMapReduceJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null); remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null);
Thread.sleep(500000); Thread.sleep(5000000);
} }
private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) { private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) {
@ -58,7 +63,7 @@ public class TaskTrackerTest {
req.setJobId(1L); req.setJobId(1L);
req.setInstanceId(10086L); req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost())); req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":25520"));
req.setJobParams("this is job Params"); req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params"); req.setInstanceParams("this is instance Params");