diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java index bd94ff8df..67d5e7cc8 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java @@ -98,7 +98,6 @@ public class WorkerExecutionOperationsNetworkStage implements WorkerExecutionOpe private final AtomicReference heartbeatRef = new AtomicReference<>(); private final SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory; private final MantisMasterGateway mantisMasterApi; - private int connectionsPerEndpoint = 2; private boolean lookupSpectatorRegistry = true; private SinkSubscriptionStateHandler subscriptionStateHandler; private Action0 onSinkSubscribe = null; @@ -120,12 +119,6 @@ public WorkerExecutionOperationsNetworkStage( this.sinkSubscriptionStateHandlerFactory = sinkSubscriptionStateHandlerFactory; this.classLoader = classLoader; - String connectionsPerEndpointStr = - ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.connectionsPerEndpoint", "2"); - if (connectionsPerEndpointStr != null && !connectionsPerEndpointStr.equals("2")) { - connectionsPerEndpoint = Integer.parseInt(connectionsPerEndpointStr); - } - String locateSpectatorRegistry = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true"); lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry); @@ -472,7 +465,7 @@ public int acquirePort() { WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable<>(rw.getPorts().next(), - remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), + remoteObservableName, rw.getJobName(), getRouterFactoryInstance(serviceLocator)); closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(), @@ -614,7 +607,7 @@ public void call() { WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable<>(workerPort, remoteObservableName, - numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName(), + rw.getJobName(), getRouterFactoryInstance(serviceLocator) ); closeables.add(StageExecutors.executeIntermediate(consumer, rw.getStage(), publisher, @@ -633,25 +626,6 @@ public void call() { } } - private Observable numWorkersAtStage(Observable selfSchedulingInfo, String jobId, final int stageNum) { - //return mantisMasterApi.schedulingChanges(jobId) - return selfSchedulingInfo - .distinctUntilChanged((prevJobSchedInfo, currentJobSchedInfo) -> (!prevJobSchedInfo.equals(currentJobSchedInfo)) ? false : true) - .flatMap((Func1>) schedulingChange -> { - Map assignments = schedulingChange.getWorkerAssignments(); - if (assignments != null && !assignments.isEmpty()) { - return Observable.from(assignments.values()); - } else { - return Observable.empty(); - } - }) - .filter(assignments -> (assignments.getStage() == stageNum)) - .map(assignments -> { - return assignments.getNumWorkers() * connectionsPerEndpoint; // scale by numConnections - }).share(); - - } - @SuppressWarnings( {"rawtypes"}) private WorkerConsumer connectToObservableAtPreviousStages(Observable selfSchedulingInfo, final String jobId, final int previousStageNum, int numInstanceAtPreviousStage, final StageConfig previousStage, final AtomicBoolean acceptSchedulingChanges, @@ -659,6 +633,17 @@ private WorkerConsumer connectToObservableAtPreviousStages(Observable> schedulingUpdates = mantisMasterApi.schedulingChanges(jobId) Observable> schedulingUpdates = selfSchedulingInfo .flatMap((Func1>) schedulingChange -> { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/LocalJobExecutorNetworked.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/LocalJobExecutorNetworked.java index c926d91b4..389f61e42 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/LocalJobExecutorNetworked.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/LocalJobExecutorNetworked.java @@ -79,7 +79,7 @@ private static void startSource(int index, int port, int workersAtNextStage, Sou logger.debug("Creating source publisher on port " + port); WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable<>(port, - null, Observable.just(workersAtNextStage * numPartitions), null); // name is set to null, defaul + null, null); // name is set to null, default // to start job StageExecutors.executeSource(index, source, stage, publisher, context, stageWorkersObservable); } @@ -104,7 +104,7 @@ private static void startIntermediate(int[] previousStagePorts, logger.debug("Creating intermediate publisher on port " + port); WorkerPublisherRemoteObservable intermediatePublisher = new WorkerPublisherRemoteObservable<>(port, - null, Observable.just(workersAtNextStage * numPartitions), null); // name is null for local + null, null); // name is null for local StageExecutors.executeIntermediate(intermediateConsumer, stage, intermediatePublisher, context); } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 73092c926..2dcfbf394 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -55,15 +55,11 @@ public class WorkerPublisherRemoteObservable implements WorkerPublisher { private String jobName; private RouterFactory routerFactory; - public WorkerPublisherRemoteObservable(int serverPort, - String name, Observable minConnectionsToSubscribe, - String jobName) { - this(serverPort, name, minConnectionsToSubscribe, jobName, new Routers()); + public WorkerPublisherRemoteObservable(int serverPort, String name, String jobName) { + this(serverPort, name, jobName, new Routers()); } - public WorkerPublisherRemoteObservable(int serverPort, - String name, Observable minConnectionsToSubscribe, - String jobName, RouterFactory routerFactory) { + public WorkerPublisherRemoteObservable(int serverPort, String name, String jobName, RouterFactory routerFactory) { this.name = name; this.serverPort = serverPort; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); diff --git a/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsGroupByTest.java b/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsGroupByTest.java index e66867480..b0d78af38 100644 --- a/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsGroupByTest.java +++ b/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsGroupByTest.java @@ -53,8 +53,7 @@ public void testExecuteSource() { PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000); int serverPort = portSelector.acquirePort(); - WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, - Observable.just(1), null); + WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, null); // execute source BehaviorSubject workersInStageOneObservable = BehaviorSubject.create(1); @@ -152,8 +151,7 @@ public void call(Subscriber subscriber) { }; WorkerConsumer consumer = new WorkerConsumerRemoteObservable(null, staticEndpoints); - WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, - Observable.just(1), null); + WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, null); // execute source StageExecutors.executeIntermediate(consumer, stages.get(1), producer, new Context()); diff --git a/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsTest.java b/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsTest.java index f5997d512..ad1373460 100644 --- a/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsTest.java +++ b/mantis-runtime/src/test/java/io/mantisrx/runtime/executor/StageExecutorsTest.java @@ -53,8 +53,7 @@ public void testExecuteSource() { PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000); int serverPort = portSelector.acquirePort(); - WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, - Observable.just(1), null); + WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, null); // execute source BehaviorSubject workersInStageOneObservable = BehaviorSubject.create(1); StageExecutors.executeSource(0, job.getSource(), stages.get(0), producer, @@ -108,8 +107,7 @@ public void call(Subscriber subscriber) { }; WorkerConsumer consumer = new WorkerConsumerRemoteObservable(null, staticEndpoints); - WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, - Observable.just(1), null); + WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, null); // execute intermediate, flatten results StageExecutors.executeIntermediate(consumer, stages.get(1), producer, new Context());