Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class WorkerExecutionOperationsNetworkStage implements WorkerExecutionOpe
private final AtomicReference<Heartbeat> heartbeatRef = new AtomicReference<>();
private final SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory;
private final MantisMasterGateway mantisMasterApi;
private int connectionsPerEndpoint = 2;
private boolean lookupSpectatorRegistry = true;
private SinkSubscriptionStateHandler subscriptionStateHandler;
private Action0 onSinkSubscribe = null;
Expand All @@ -120,12 +119,6 @@ public WorkerExecutionOperationsNetworkStage(
this.sinkSubscriptionStateHandlerFactory = sinkSubscriptionStateHandlerFactory;
this.classLoader = classLoader;

String connectionsPerEndpointStr =
ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.connectionsPerEndpoint", "2");
if (connectionsPerEndpointStr != null && !connectionsPerEndpointStr.equals("2")) {
connectionsPerEndpoint = Integer.parseInt(connectionsPerEndpointStr);
}

String locateSpectatorRegistry =
ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true");
lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry);
Expand Down Expand Up @@ -472,7 +465,7 @@ public int acquirePort() {

WorkerPublisherRemoteObservable publisher
= new WorkerPublisherRemoteObservable<>(rw.getPorts().next(),
remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1),
remoteObservableName,
rw.getJobName(), getRouterFactoryInstance(serviceLocator));

closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(),
Expand Down Expand Up @@ -614,7 +607,7 @@ public void call() {

WorkerPublisherRemoteObservable publisher
= new WorkerPublisherRemoteObservable<>(workerPort, remoteObservableName,
numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName(),
rw.getJobName(),
getRouterFactoryInstance(serviceLocator)
);
closeables.add(StageExecutors.executeIntermediate(consumer, rw.getStage(), publisher,
Expand All @@ -633,32 +626,24 @@ public void call() {
}
}

private Observable<Integer> numWorkersAtStage(Observable<JobSchedulingInfo> selfSchedulingInfo, String jobId, final int stageNum) {
//return mantisMasterApi.schedulingChanges(jobId)
return selfSchedulingInfo
.distinctUntilChanged((prevJobSchedInfo, currentJobSchedInfo) -> (!prevJobSchedInfo.equals(currentJobSchedInfo)) ? false : true)
.flatMap((Func1<JobSchedulingInfo, Observable<WorkerAssignments>>) schedulingChange -> {
Map<Integer, WorkerAssignments> assignments = schedulingChange.getWorkerAssignments();
if (assignments != null && !assignments.isEmpty()) {
return Observable.from(assignments.values());
} else {
return Observable.empty();
}
})
.filter(assignments -> (assignments.getStage() == stageNum))
.map(assignments -> {
return assignments.getNumWorkers() * connectionsPerEndpoint; // scale by numConnections
}).share();

}

@SuppressWarnings( {"rawtypes"})
private WorkerConsumer connectToObservableAtPreviousStages(Observable<JobSchedulingInfo> selfSchedulingInfo, final String jobId, final int previousStageNum,
int numInstanceAtPreviousStage, final StageConfig previousStage, final AtomicBoolean acceptSchedulingChanges,
final Observer<Status> jobStatusObserver, final int stageNumToExecute, final int workerIndex, final int workerNumber,
ServiceLocator serviceLocator) {
logger.info("Watching for scheduling changes");

int connectionsPerEndpoint;
String connectionsPerEndpointStr =
ServiceRegistry.INSTANCE
.getPropertiesService()
.getStringValue("mantis.worker.connectionsPerEndpoint", "2");
if (connectionsPerEndpointStr != null && !connectionsPerEndpointStr.equals("2")) {
connectionsPerEndpoint = Integer.parseInt(connectionsPerEndpointStr);
} else {
connectionsPerEndpoint = 2;
}

//Observable<List<Endpoint>> schedulingUpdates = mantisMasterApi.schedulingChanges(jobId)
Observable<List<Endpoint>> schedulingUpdates = selfSchedulingInfo
.flatMap((Func1<JobSchedulingInfo, Observable<WorkerAssignments>>) schedulingChange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private static void startSource(int index, int port, int workersAtNextStage, Sou
logger.debug("Creating source publisher on port " + port);
WorkerPublisherRemoteObservable publisher
= new WorkerPublisherRemoteObservable<>(port,
null, Observable.just(workersAtNextStage * numPartitions), null); // name is set to null, defaul
null, null); // name is set to null, default
// to start job
StageExecutors.executeSource(index, source, stage, publisher, context, stageWorkersObservable);
}
Expand All @@ -104,7 +104,7 @@ private static void startIntermediate(int[] previousStagePorts,
logger.debug("Creating intermediate publisher on port " + port);
WorkerPublisherRemoteObservable intermediatePublisher
= new WorkerPublisherRemoteObservable<>(port,
null, Observable.just(workersAtNextStage * numPartitions), null); // name is null for local
null, null); // name is null for local
StageExecutors.executeIntermediate(intermediateConsumer, stage, intermediatePublisher,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ public class WorkerPublisherRemoteObservable<T> implements WorkerPublisher<T> {
private String jobName;
private RouterFactory routerFactory;

public WorkerPublisherRemoteObservable(int serverPort,
String name, Observable<Integer> minConnectionsToSubscribe,
String jobName) {
this(serverPort, name, minConnectionsToSubscribe, jobName, new Routers());
public WorkerPublisherRemoteObservable(int serverPort, String name, String jobName) {
this(serverPort, name, jobName, new Routers());
}

public WorkerPublisherRemoteObservable(int serverPort,
String name, Observable<Integer> minConnectionsToSubscribe,
String jobName, RouterFactory routerFactory) {
public WorkerPublisherRemoteObservable(int serverPort, String name, String jobName, RouterFactory routerFactory) {
this.name = name;
this.serverPort = serverPort;
this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public void testExecuteSource() {
PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000);
int serverPort = portSelector.acquirePort();

WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null,
Observable.just(1), null);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, null);
// execute source

BehaviorSubject<Integer> workersInStageOneObservable = BehaviorSubject.create(1);
Expand Down Expand Up @@ -152,8 +151,7 @@ public void call(Subscriber<? super EndpointChange> subscriber) {
};

WorkerConsumer consumer = new WorkerConsumerRemoteObservable(null, staticEndpoints);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null,
Observable.just(1), null);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, null);
// execute source
StageExecutors.executeIntermediate(consumer, stages.get(1), producer,
new Context());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public void testExecuteSource() {
PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000);
int serverPort = portSelector.acquirePort();

WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null,
Observable.just(1), null);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(serverPort, null, null);
// execute source
BehaviorSubject<Integer> workersInStageOneObservable = BehaviorSubject.create(1);
StageExecutors.executeSource(0, job.getSource(), stages.get(0), producer,
Expand Down Expand Up @@ -108,8 +107,7 @@ public void call(Subscriber<? super EndpointChange> subscriber) {
};

WorkerConsumer consumer = new WorkerConsumerRemoteObservable(null, staticEndpoints);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null,
Observable.just(1), null);
WorkerPublisher producer = new WorkerPublisherRemoteObservable(publishPort, null, null);
// execute intermediate, flatten results
StageExecutors.executeIntermediate(consumer, stages.get(1), producer,
new Context());
Expand Down