diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java index 3e619aee0..fcc9c77d9 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java @@ -749,6 +749,7 @@ public Observable namedJobInfo(final String jobName) { return response.getContent() .map(event -> { try { + logger.info("[fdc-91] namedJobInfo - {}", event); return objectMapper.readValue(event.contentAsString(), NamedJobInfo.class); } catch (IOException e) { throw new RuntimeException("Invalid namedJobInfo json: " + e.getMessage(), e); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java index df6654a50..bc8475d3c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/IJobClustersManager.java @@ -78,6 +78,7 @@ public interface IJobClustersManager { // worker related messages void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request); + void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request); void onWorkerEvent(WorkerEvent r); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java index 608f00e6e..82915511e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/JobClustersManagerActor.java @@ -92,6 +92,8 @@ import io.mantisrx.master.jobcluster.job.JobState; import io.mantisrx.master.jobcluster.proto.BaseResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterLabelsRequest; @@ -110,6 +112,7 @@ import io.mantisrx.server.master.persistence.MantisJobStore; import io.mantisrx.server.master.scheduler.MantisSchedulerFactory; import io.mantisrx.server.master.scheduler.WorkerEvent; +import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting; import io.mantisrx.shaded.com.google.common.collect.Lists; import java.time.Duration; import java.util.ArrayList; @@ -140,23 +143,31 @@ public class JobClustersManagerActor extends AbstractActorWithTimers implements private final Counter numJobClusterInitFailures; private final Counter numJobClusterInitSuccesses; private Receive initializedBehavior; + + @VisibleForTesting public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) { - return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator) + return props(jobStore, eventPublisher, costsCalculator, Collections.emptyList()); + } + + public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, final List namedJobsReferToLaunched) { + return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator, namedJobsReferToLaunched) .withMailbox("akka.actor.metered-mailbox"); } private final MantisJobStore jobStore; private final LifecycleEventPublisher eventPublisher; private final CostsCalculator costsCalculator; + private final List namedJobsReferToLaunched; private MantisSchedulerFactory mantisSchedulerFactory = null; JobClusterInfoManager jobClusterInfoManager; private ActorRef jobListHelperActor; - public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) { + public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, final List namedJobsReferToLaunched) { this.jobStore = store; this.eventPublisher = eventPublisher; this.costsCalculator = costsCalculator; + this.namedJobsReferToLaunched = namedJobsReferToLaunched; MetricGroupId metricGroupId = getMetricGroupId(); Metrics m = new Metrics.Builder() @@ -234,6 +245,7 @@ private Receive getInitializedBehavior() { .match(GetJobClusterRequest.class, this::onJobClusterGet) .match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) + .match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject) .match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers) // List Job Cluster related messages .match(ListJobClustersRequest.class, this::onJobClustersList) @@ -291,6 +303,7 @@ private Receive getInitializingBehavior() { .match(GetJobClusterRequest.class, (x) -> getSender().tell(new GetJobClusterResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) .match(ListCompletedJobsInClusterRequest.class, (x) -> logger.warn(genUnexpectedMsg(x.toString(), state))) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf())) .match(ListArchivedWorkersRequest.class, (x) -> getSender().tell(new ListArchivedWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) .match(ListJobClustersRequest.class, (x) -> getSender().tell(new ListJobClustersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf())) @@ -323,7 +336,7 @@ private void initialize(JobClustersManagerInitialize initMsg) { mantisSchedulerFactory = initMsg.getScheduler(); Map jobClusterMap = new HashMap<>(); - this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator); + this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator, namedJobsReferToLaunched); if (!initMsg.isLoadJobsFromStore()) { getContext().become(initializedBehavior); @@ -522,6 +535,18 @@ public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r) sender.tell(new GetLastSubmittedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf()); } } + + @Override + public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest r) { + Optional jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName()); + ActorRef sender = getSender(); + if (jobClusterInfo.isPresent()) { + jobClusterInfo.get().jobClusterActor.forward(r, getContext()); + } else { + sender.tell(new GetLastLaunchedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf()); + } + } + @Override public void onWorkerEvent(WorkerEvent workerEvent) { if(logger.isDebugEnabled()) { logger.debug("Entering JobClusterManagerActor:onWorkerEvent {}", workerEvent); } @@ -826,12 +851,14 @@ class JobClusterInfoManager { private final MantisJobStore jobStore; private final Metrics metrics; private final CostsCalculator costsCalculator; + private final List namedJobsReferToLaunched; - JobClusterInfoManager(MantisJobStore jobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) { + JobClusterInfoManager(MantisJobStore jobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator, List namedJobsReferToLaunched) { this.eventPublisher = eventPublisher; this.mantisSchedulerFactory = mantisSchedulerFactory; this.jobStore = jobStore; this.costsCalculator = costsCalculator; + this.namedJobsReferToLaunched = namedJobsReferToLaunched; MetricGroupId metricGroupId = new MetricGroupId("JobClusterInfoManager"); @@ -859,7 +886,7 @@ Optional createClusterActorAndRegister(IJobClusterDefinition job } ActorRef jobClusterActor = getContext().actorOf( - JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator), + JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator, this.namedJobsReferToLaunched), "JobClusterActor-" + clusterName); getContext().watch(jobClusterActor); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java index 42532e499..3406ab1da 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java @@ -61,6 +61,7 @@ import io.mantisrx.server.core.master.MasterMonitor; import io.mantisrx.server.master.ILeadershipManager; import io.mantisrx.server.master.LeaderRedirectionFilter; +import io.mantisrx.server.master.config.MasterConfiguration; import io.mantisrx.server.master.persistence.IMantisPersistenceProvider; import io.mantisrx.server.master.resourcecluster.ResourceClusters; import java.util.concurrent.CompletionStage; @@ -91,6 +92,7 @@ public class MasterApiAkkaService extends BaseService { private final Materializer materializer; private final ExecutorService executorService; private final CountDownLatch serviceLatch = new CountDownLatch(1); + private final MasterConfiguration masterConfig; public MasterApiAkkaService(final MasterMonitor masterMonitor, final MasterDescription masterDescription, @@ -101,7 +103,8 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, final int serverPort, final IMantisPersistenceProvider mantisStorageProvider, final LifecycleEventPublisher lifecycleEventPublisher, - final ILeadershipManager leadershipManager) { + final ILeadershipManager leadershipManager, + final MasterConfiguration masterConfig) { super(true); Preconditions.checkNotNull(masterMonitor, "MasterMonitor"); Preconditions.checkNotNull(masterDescription, "masterDescription"); @@ -110,6 +113,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor, Preconditions.checkNotNull(mantisStorageProvider, "mantisStorageProvider"); Preconditions.checkNotNull(lifecycleEventPublisher, "lifecycleEventPublisher"); Preconditions.checkNotNull(leadershipManager, "leadershipManager"); + this.masterConfig = masterConfig; this.masterMonitor = masterMonitor; this.masterDescription = masterDescription; this.jobClustersManagerActor = jobClustersManagerActor; @@ -151,7 +155,7 @@ private MantisMasterRoute configureApiRoutes(final ActorSystem actorSystem) { final JobStatusRouteHandler jobStatusRouteHandler = new JobStatusRouteHandlerAkkaImpl(actorSystem, statusEventBrokerActor); final JobDiscoveryRouteHandler jobDiscoveryRouteHandler = new JobDiscoveryRouteHandlerAkkaImpl(jobClustersManagerActor, idleTimeout); - final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(jobDiscoveryRouteHandler); + final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(masterConfig.getNamedJobsReferToLaunched(), jobDiscoveryRouteHandler); final JobClusterRoute v0JobClusterRoute = new JobClusterRoute(jobClusterRouteHandler, jobRouteHandler, actorSystem); final JobStatusRoute v0JobStatusRoute = new JobStatusRoute(jobStatusRouteHandler); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java index 83c9d89b5..dc881c870 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandler.java @@ -27,4 +27,6 @@ CompletionStage schedulingInfoStream(final JobClusterManagerP final boolean sendHeartbeats); CompletionStage lastSubmittedJobIdStream(final JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request, final boolean sendHeartbeats); + CompletionStage lastLaunchedJobIdStream(final JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request, + final boolean sendHeartbeats); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java index e7b45555a..f6c72b831 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.java @@ -27,9 +27,12 @@ import io.mantisrx.common.metrics.Metrics; import io.mantisrx.master.api.akka.route.proto.JobClusterInfo; import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto; +import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto.JobClusterInfoResponse; import io.mantisrx.master.jobcluster.proto.BaseResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse; import io.mantisrx.server.core.JobSchedulingInfo; @@ -58,9 +61,11 @@ public class JobDiscoveryRouteHandlerAkkaImpl implements JobDiscoveryRouteHandle private final Counter schedInfoStreamErrors; private final Counter lastSubmittedJobIdStreamErrors; + private final Counter lastLaunchedJobIdStreamErrors; private final AsyncLoadingCache schedInfoCache; private final AsyncLoadingCache lastSubmittedJobIdStreamRespCache; + private final AsyncLoadingCache lastLaunchedJobIdStreamRespCache; public JobDiscoveryRouteHandlerAkkaImpl(ActorRef jobClustersManagerActor, Duration serverIdleTimeout) { this.jobClustersManagerActor = jobClustersManagerActor; @@ -77,13 +82,20 @@ public JobDiscoveryRouteHandlerAkkaImpl(ActorRef jobClustersManagerActor, Durati .maximumSize(500) .buildAsync(this::lastSubmittedJobId); + lastLaunchedJobIdStreamRespCache = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.SECONDS) + .maximumSize(500) + .buildAsync(this::lastLaunchedJobId); + Metrics m = new Metrics.Builder() .id("JobDiscoveryRouteHandlerAkkaImpl") .addCounter("schedInfoStreamErrors") .addCounter("lastSubmittedJobIdStreamErrors") + .addCounter("lastLaunchedJobIdStreamErrors") .build(); this.schedInfoStreamErrors = m.getCounter("schedInfoStreamErrors"); this.lastSubmittedJobIdStreamErrors = m.getCounter("lastSubmittedJobIdStreamErrors"); + this.lastLaunchedJobIdStreamErrors = m.getCounter("lastLaunchedJobIdStreamErrors"); } @@ -159,47 +171,79 @@ private CompletableFuture lastSubmittedJobI } @Override - public CompletionStage lastSubmittedJobIdStream(final GetLastSubmittedJobIdStreamRequest request, - final boolean sendHeartbeats) { - CompletionStage response = lastSubmittedJobIdStreamRespCache.get(request); - try { - return response - .thenApply(lastSubmittedJobIdResp -> { - Optional> jobIdSubjectO = lastSubmittedJobIdResp.getjobIdBehaviorSubject(); - if (lastSubmittedJobIdResp.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && jobIdSubjectO.isPresent()) { - Observable jobClusterInfoObs = jobIdSubjectO.get().map(jobId -> new JobClusterInfo(jobId.getCluster(), jobId.getId())); - - Observable heartbeats = - Observable.interval(5, serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS) - .map(x -> JOB_CLUSTER_INFO_HB_INSTANCE) - .takeWhile(x -> sendHeartbeats == true); - - Observable jobClusterInfoWithHB = Observable.merge(jobClusterInfoObs, heartbeats); - return new JobDiscoveryRouteProto.JobClusterInfoResponse( - lastSubmittedJobIdResp.requestId, - lastSubmittedJobIdResp.responseCode, - lastSubmittedJobIdResp.message, - jobClusterInfoWithHB - ); - } else { - logger.info("Failed to get lastSubmittedJobId stream for job cluster {}", request.getClusterName()); - lastSubmittedJobIdStreamErrors.increment(); - return new JobDiscoveryRouteProto.JobClusterInfoResponse( - lastSubmittedJobIdResp.requestId, - lastSubmittedJobIdResp.responseCode, - lastSubmittedJobIdResp.message - ); - } - }); + public CompletionStage lastSubmittedJobIdStream( + final GetLastSubmittedJobIdStreamRequest request, final boolean sendHeartbeats) { + logger.info("[fdc-91] lastSubmittedJobIdStream --> {}", request.getClusterName()); + try { + CompletionStage response = lastSubmittedJobIdStreamRespCache.get(request); + return response.thenApply(r -> streamJobIdBehaviorSubject(r, r.getjobIdBehaviorSubject(), sendHeartbeats, lastSubmittedJobIdStreamErrors)); } catch (Exception e) { logger.error("caught exception fetching lastSubmittedJobId stream for {}", request.getClusterName(), e); lastSubmittedJobIdStreamErrors.increment(); - return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse( + return CompletableFuture.completedFuture(new JobClusterInfoResponse( 0, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + request.getClusterName() + " error: " + e.getMessage() )); } } + + private CompletableFuture lastLaunchedJobId(final GetLastLaunchedJobIdStreamRequest request, Executor executor) { + return ask(jobClustersManagerActor, request, askTimeout) + .thenApply(GetLastLaunchedJobIdStreamResponse.class::cast) + .toCompletableFuture(); + } + + @Override + public CompletionStage lastLaunchedJobIdStream( + final GetLastLaunchedJobIdStreamRequest request, final boolean sendHeartbeats) { + + logger.info("[fdc-91] lastLaunchedJobIdStream --> {}", request.getClusterName()); + try { + CompletionStage response = lastLaunchedJobIdStreamRespCache.get(request); + return response.thenApply(r -> streamJobIdBehaviorSubject(r, r.getjobIdBehaviorSubject(), sendHeartbeats, lastLaunchedJobIdStreamErrors)); + } catch (Exception e) { + logger.error("caught exception fetching lastLaunchedJobId stream for {}", request.getClusterName(), e); + lastLaunchedJobIdStreamErrors.increment(); + return CompletableFuture.completedFuture(new JobClusterInfoResponse( + 0, + BaseResponse.ResponseCode.SERVER_ERROR, + "Failed to get last launched jobId stream for " + request.getClusterName() + " error: " + e.getMessage() + )); + } + } + + /** + * + * @param response response from actor + * @param jobIdSubjectO BehaviorSubject that exposes latest jobId for a jobCluster in Accepted/Launched state depending on endpoint + */ + private JobClusterInfoResponse streamJobIdBehaviorSubject( + BaseResponse response, Optional> jobIdSubjectO, + boolean sendHeartbeats, Counter counter) { + if (response.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && jobIdSubjectO.isPresent()) { + Observable jobClusterInfoObs = jobIdSubjectO.get().map(jobId -> new JobClusterInfo(jobId.getCluster(), jobId.getId())); + + Observable heartbeats = + Observable.interval(5, serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS) + .map(x -> JOB_CLUSTER_INFO_HB_INSTANCE) + .takeWhile(x -> sendHeartbeats); + + Observable jobClusterInfoWithHB = Observable.merge(jobClusterInfoObs, heartbeats); + return new JobClusterInfoResponse( + response.requestId, + response.responseCode, + response.message, + jobClusterInfoWithHB + ); + } else { + counter.increment(); + return new JobClusterInfoResponse( + response.requestId, + response.responseCode, + response.message + ); + } + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java index b08a87231..8648351b5 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v0/JobDiscoveryRoute.java @@ -39,6 +39,9 @@ import io.mantisrx.server.core.JobSchedulingInfo; import io.mantisrx.server.master.domain.JobId; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -49,6 +52,9 @@ public class JobDiscoveryRoute extends BaseRoute { private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryRoute.class); + + // return latest Launched job + private final List namedJobsReferToLaunched; private final JobDiscoveryRouteHandler jobDiscoveryRouteHandler; private final Metrics metrics; @@ -56,6 +62,11 @@ public class JobDiscoveryRoute extends BaseRoute { private final Counter jobClusterInfoStreamGET; public JobDiscoveryRoute(final JobDiscoveryRouteHandler jobDiscoveryRouteHandler) { + this(Collections.emptyList(), jobDiscoveryRouteHandler); + } + + public JobDiscoveryRoute(final List namedJobsReferToLaunched, final JobDiscoveryRouteHandler jobDiscoveryRouteHandler) { + this.namedJobsReferToLaunched = namedJobsReferToLaunched; this.jobDiscoveryRouteHandler = jobDiscoveryRouteHandler; Metrics m = new Metrics.Builder() .id("JobDiscoveryRoute") @@ -103,13 +114,9 @@ private Route getJobDiscoveryRoutes() { .get(); Source schedInfoSource = Source.fromPublisher( - RxReactiveStreams.toPublisher( - schedulingInfoObs)) - .map(j -> StreamingUtils.from( - j) - .orElse(null)) - .filter(sse -> sse != - null); + RxReactiveStreams.toPublisher(schedulingInfoObs)) + .map(j -> StreamingUtils.from(j).orElse(null)) + .filter(Objects::nonNull); return completeOK( schedInfoSource, EventStreamMarshalling.toEventStream()); @@ -134,14 +141,24 @@ private Route getJobDiscoveryRoutes() { "/namedjobs/{} called", jobCluster); jobClusterInfoStreamGET.increment(); - JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req = + final CompletionStage jobClusterInfoRespCS; + boolean jobClusterRefersToLaunched = namedJobsReferToLaunched.stream() + .anyMatch(item -> item.equalsIgnoreCase(jobCluster)); + if (jobClusterRefersToLaunched) { + JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest req = + new JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest( + jobCluster); + + jobClusterInfoRespCS = jobDiscoveryRouteHandler.lastLaunchedJobIdStream( + req, sendHeartbeats.orElse(false)); + } else { + JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest req = new JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest( jobCluster); - CompletionStage jobClusterInfoRespCS = - jobDiscoveryRouteHandler.lastSubmittedJobIdStream( - req, - sendHeartbeats.orElse(false)); + jobClusterInfoRespCS = jobDiscoveryRouteHandler.lastSubmittedJobIdStream( + req, sendHeartbeats.orElse(false)); + } return completeAsync( jobClusterInfoRespCS, r -> { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java index f8a7a50a3..960205fe3 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/IJobClusterManager.java @@ -88,6 +88,8 @@ public interface IJobClusterManager { void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest request); void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request); + void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request); + void onEnforceSLARequest(EnforceSLARequest request); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java index 83e102a32..61fcd9e9e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java @@ -73,6 +73,8 @@ import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobSchedInfoResponse; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest; @@ -132,6 +134,7 @@ import io.mantisrx.server.master.scheduler.MantisScheduler; import io.mantisrx.server.master.scheduler.MantisSchedulerFactory; import io.mantisrx.server.master.scheduler.WorkerEvent; +import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting; import io.mantisrx.shaded.com.google.common.base.Throwables; import io.mantisrx.shaded.com.google.common.collect.Lists; import java.io.IOException; @@ -139,6 +142,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -197,14 +201,24 @@ public class JobClusterActor extends AbstractActorWithTimers implements IJobClus private final Counter numJobClusterUpdateErrors; private final Counter numSLAEnforcementExecutions; - + @VisibleForTesting public static Props props( final String name, final MantisJobStore jobStore, final MantisSchedulerFactory mantisSchedulerFactory, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) { - return Props.create(JobClusterActor.class, name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator); + return props(name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator, Collections.emptyList()); + } + + public static Props props( + final String name, + final MantisJobStore jobStore, + final MantisSchedulerFactory mantisSchedulerFactory, + final LifecycleEventPublisher eventPublisher, + final CostsCalculator costsCalculator, + final List namedJobsReferToLaunched) { + return Props.create(JobClusterActor.class, name, jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator, namedJobsReferToLaunched); } private final Receive initializedBehavior; @@ -215,6 +229,7 @@ public static Props props( private final MantisJobStore jobStore; private IJobClusterMetadata jobClusterMetadata; private CronManager cronManager; + private List namedJobsReferToLaunched; private SLAEnforcer slaEnforcer; private final JobManager jobManager; @@ -222,6 +237,8 @@ public static Props props( private final LifecycleEventPublisher eventPublisher; private final BehaviorSubject jobIdSubmissionSubject; + private final BehaviorSubject jobIdLaunchedSubject; + private final JobDefinitionResolver jobDefinitionResolver = new JobDefinitionResolver(); private final Metrics metrics; @@ -231,15 +248,18 @@ public JobClusterActor( final MantisJobStore jobStore, final MantisSchedulerFactory schedulerFactory, final LifecycleEventPublisher eventPublisher, - final CostsCalculator costsCalculator) { + final CostsCalculator costsCalculator, + final List namedJobsReferToLaunched) { this.name = name; this.jobStore = jobStore; this.mantisSchedulerFactory = schedulerFactory; this.eventPublisher = eventPublisher; + this.namedJobsReferToLaunched = namedJobsReferToLaunched; this.jobManager = new JobManager(name, getContext(), mantisSchedulerFactory, eventPublisher, jobStore, costsCalculator); jobIdSubmissionSubject = BehaviorSubject.create(); + jobIdLaunchedSubject = BehaviorSubject.create(); initializedBehavior = buildInitializedBehavior(); disabledBehavior = buildDisabledBehavior(); @@ -422,6 +442,7 @@ private Receive buildDisabledBehavior() { .match(GetJobSchedInfoRequest.class, (x) -> getSender().tell(new GetJobSchedInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLatestJobDiscoveryInfoRequest.class, (x) -> getSender().tell(new GetLatestJobDiscoveryInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(ListJobIdsRequest.class, (x) -> getSender().tell(new ListJobIdsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) .match(ListWorkersRequest.class, (x) -> getSender().tell(new ListWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), new ArrayList()), getSelf())) @@ -516,6 +537,7 @@ private Receive buildInitialBehavior() { .match(GetJobSchedInfoRequest.class, (x) -> getSender().tell(new GetJobSchedInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLatestJobDiscoveryInfoRequest.class, (x) -> getSender().tell(new GetLatestJobDiscoveryInfoResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) + .match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), empty()), getSelf())) .match(ListJobIdsRequest.class, (x) -> getSender().tell(new ListJobIdsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) .match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) .match(ListWorkersRequest.class, (x) -> getSender().tell(new ListWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), this.name, state), Lists.newArrayList()), getSelf())) @@ -610,6 +632,7 @@ private Receive buildInitializedBehavior() { .match(JobProto.JobInitialized.class, this::onJobInitialized) .match(JobStartedEvent.class, this::onJobStarted) .match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject) + .match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject) .match(ScaleStageRequest.class, this::onScaleStage) // EXPECTED MESSAGES END // // EXPECTED MESSAGES BEGIN // @@ -854,7 +877,10 @@ private void initRunningJobs(JobClusterProto.InitializeJobClusterRequest initReq JobId lastJobId = new JobId(this.name, initReq.lastJobNumber); this.jobIdSubmissionSubject.onNext(lastJobId); } - + initReq.jobList.stream() + .filter(m -> m.getState() == JobState.Launched) + .max(Comparator.comparingLong(m -> m.getJobId().getJobNum())) + .ifPresent(m -> this.jobIdLaunchedSubject.onNext(m.getJobId())); setBookkeepingTimer(BOOKKEEPING_INTERVAL_SECS); @@ -1615,13 +1641,14 @@ public void onJobInitialized(JobProto.JobInitialized jobInited) { public void onJobStarted(final JobStartedEvent startedEvent) { logger.info("job {} started event", startedEvent.jobid); - Optional jobInfoOp = jobManager.getJobInfoForNonTerminalJob(startedEvent.jobid); - - if(jobInfoOp.isPresent()) { - // enforce SLA - jobManager.markJobStarted(jobInfoOp.get()); - getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), of(jobInfoOp.get().jobDefinition)), getSelf()); - } + jobManager + .getJobInfoForNonTerminalJob(startedEvent.jobid) + .ifPresent(jobInfo -> { + jobIdLaunchedSubject.onNext(startedEvent.jobid); + jobManager.markJobStarted(jobInfo); + // Enforce SLA + getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), of(jobInfo.jobDefinition)), getSelf()); + }); } @@ -1905,7 +1932,15 @@ public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDisco if(logger.isTraceEnabled()) { logger.trace("Enter onGetLatestJobDiscoveryInfo {}", request); } ActorRef sender = getSender(); if(this.name.equals(request.getJobCluster())) { - JobId latestJobId = jobIdSubmissionSubject.getValue(); + // TODO: I think this might be wrong. + boolean jobClusterRefersToLaunched = namedJobsReferToLaunched.stream() + .anyMatch(item -> item.equalsIgnoreCase(request.getJobCluster())); + JobId latestJobId; + if (jobClusterRefersToLaunched) { + latestJobId = jobIdLaunchedSubject.getValue(); + } else { + latestJobId = jobIdSubmissionSubject.getValue(); + } logger.debug("[{}] latest job Id for cluster: {}", name, latestJobId); if (latestJobId != null) { Optional jInfo = jobManager.getJobInfoForNonTerminalJob(latestJobId); @@ -1970,6 +2005,18 @@ public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest re } + @Override + public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest request) { + ActorRef sender = getSender(); + if(this.name.equals(request.getClusterName())) { + sender.tell(new GetLastLaunchedJobIdStreamResponse(request.requestId, SUCCESS,"", of(this.jobIdLaunchedSubject)),getSelf()); + } else { + String msg = "Job Cluster " + request.getClusterName() + " In request does not match the name of this actor " + this.name; + logger.warn(msg); + sender.tell(new GetLastLaunchedJobIdStreamResponse(request.requestId, CLIENT_ERROR ,msg,empty()),getSelf()); + } + } + @Override public void onBookkeepingRequest(JobClusterProto.BookkeepingRequest request) { if(logger.isTraceEnabled()) { logger.trace("Enter onBookkeepingRequest for JobCluster {}", this.name); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java index b4ca6bfdf..b4e903e57 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java @@ -55,6 +55,8 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; +import lombok.Value; +import org.apache.commons.lang3.StringUtils; import rx.subjects.BehaviorSubject; public class JobClusterManagerProto { @@ -1903,6 +1905,17 @@ public Optional> getJobSchedInfoSubject() { } } + @Value + @EqualsAndHashCode(callSuper = false) + public static class GetLastLaunchedJobIdStreamRequest extends BaseRequest { + String clusterName; + + public GetLastLaunchedJobIdStreamRequest(final String clusterName) { + Preconditions.checkArg(StringUtils.isNotBlank(clusterName), "Must provide job cluster name in request"); + this.clusterName = clusterName; + } + } + /** * Stream of JobId submissions for a cluster */ @@ -1945,6 +1958,23 @@ public String toString() { } } + public static final class GetLastLaunchedJobIdStreamResponse extends BaseResponse { + Optional> jobIdBehaviorSubject; + + public GetLastLaunchedJobIdStreamResponse( + final long requestId, + final ResponseCode code, + final String msg, + final Optional> jobIdBehaviorSubject) { + super(requestId, code, msg); + this.jobIdBehaviorSubject = jobIdBehaviorSubject; + } + + public Optional> getjobIdBehaviorSubject() { + return this.jobIdBehaviorSubject; + } + } + public static final class GetLastSubmittedJobIdStreamResponse extends BaseResponse { private final Optional> jobIdBehaviorSubject; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java index 63b62fecd..0b70e276a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java @@ -158,7 +158,7 @@ public MasterMain( storageProvider = new KeyValueBasedPersistenceProvider(this.config.getStorageProvider(), lifecycleEventPublisher); final MantisJobStore mantisJobStore = new MantisJobStore(storageProvider); - final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator()), "JobClustersManager"); + final ActorRef jobClusterManagerActor = system.actorOf(JobClustersManagerActor.props(mantisJobStore, lifecycleEventPublisher, config.getJobCostsCalculator(), config.getNamedJobsReferToLaunched()), "JobClustersManager"); final JobMessageRouter jobMessageRouter = new JobMessageRouterImpl(jobClusterManagerActor); // Beginning of new stuff @@ -206,14 +206,14 @@ public MasterMain( if (this.config.isLocalMode()) { mantisServices.addService(new MasterApiAkkaService(new LocalMasterMonitor(leadershipManager.getDescription()), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor, - resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager)); + resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager, config)); leadershipManager.becomeLeader(); } else { curatorService = new CuratorService(this.config); curatorService.start(); mantisServices.addService(createLeaderElector(curatorService, leadershipManager)); mantisServices.addService(new MasterApiAkkaService(curatorService.getMasterMonitor(), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor, - resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager)); + resourceClusters, resourceClustersHostActor, config.getApiPort(), storageProvider, lifecycleEventPublisher, leadershipManager, config)); } m.getCounter("masterInitSuccess").increment(); } catch (Exception e) { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java index 53440a627..aa6645d21 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java @@ -23,6 +23,7 @@ import io.mantisrx.shaded.com.google.common.base.Splitter; import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; import java.time.Duration; +import java.util.List; import java.util.Map; import org.skife.config.Config; import org.skife.config.Default; @@ -364,6 +365,10 @@ default Duration getSchedulerIntervalBetweenRetries() { @Default("") String getSchedulingConstraintsString(); + @Config("mantis.route.namedjobs.useLaunched") + @Default("") + String getNamedJobsReferToLaunchedString(); + default Duration getHeartbeatInterval() { return Duration.ofMillis(getHeartbeatIntervalInMs()); } @@ -373,4 +378,8 @@ default Duration getMaxAssignmentThreshold() { } default Map getSchedulingConstraints() { return getSchedulingConstraintsString().isEmpty() ? ImmutableMap.of() : Splitter.on(",").withKeyValueSeparator(':').split(getSchedulingConstraintsString());} + + default List getNamedJobsReferToLaunched() { + return Splitter.on(",").omitEmptyStrings().trimResults().splitToList(getNamedJobsReferToLaunchedString()); + } }