Skip to content

Commit

Permalink
GH-5197: preparation for supporting fair sub-query execution in FedX
Browse files Browse the repository at this point in the history
This change adds preparational infrastructure for having different
implementations of schedulers. Configuration is here prepared by means
of defining a "SchedulerFactory" interface with a default implementation
aside (which essentially mimics the current behavior).

Note that for ease of development some aspects of
ControlledWorkerScheduler are made accessible to sub-classes. The idea
is that in the end version there is an abstract scheduler class
providing shared functionality and different implementation (e.g. the
current FIFO one and a fair implementation)
  • Loading branch information
aschwarte10 committed Nov 11, 2024
1 parent ab02413 commit d74e6be
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint;
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXException;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {

private FederationEvaluationStrategyFactory strategyFactory;

private SchedulerFactory schedulerFactory;

private WriteStrategyFactory writeStrategyFactory;

private File dataDir;
Expand Down Expand Up @@ -96,6 +100,22 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
this.strategyFactory = strategyFactory;
}

/* package */ SchedulerFactory getSchedulerFactory() {
if (schedulerFactory == null) {
schedulerFactory = DefaultSchedulerFactory.INSTANCE;
}
return schedulerFactory;
}

/**
* Set the {@link SchedulerFactory}. Can only be done before initialization of the federation
*
* @param schedulerFactory the {@link SchedulerFactory}
*/
public void setSchedulerFactory(SchedulerFactory schedulerFactory) {
this.schedulerFactory = schedulerFactory;
}

/**
*
* @param writeStrategyFactory the {@link WriteStrategyFactory}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
Expand Down Expand Up @@ -118,26 +119,28 @@ public void reset() {
log.debug("Scheduler for join and union are reset.");
}

SchedulerFactory schedulerFactory = federation.getSchedulerFactory();

Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper();
if (joinScheduler != null) {
joinScheduler.abort();
}
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(),
"Join Scheduler");
joinScheduler = schedulerFactory.createJoinScheduler(federationContext,
federationContext.getConfig().getJoinWorkerThreads());
taskWrapper.ifPresent(joinScheduler::setTaskWrapper);

if (unionScheduler != null) {
unionScheduler.abort();
}
unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(),
"Union Scheduler");
unionScheduler = schedulerFactory.createUnionScheduler(federationContext,
federationContext.getConfig().getUnionWorkerThreads());
taskWrapper.ifPresent(unionScheduler::setTaskWrapper);

if (leftJoinScheduler != null) {
leftJoinScheduler.abort();
}
leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(),
"Left Join Scheduler");
leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext,
federationContext.getConfig().getLeftJoinWorkerThreads());
taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -42,7 +43,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw

private final ExecutorService executor;

private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>();
// Note: initialized in #createExecutorService
protected BlockingQueue<Runnable> _taskQueue;

private final int nWorkers;
private final String name;
Expand All @@ -57,7 +59,7 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
public ControlledWorkerScheduler(int nWorkers, String name) {
this.nWorkers = nWorkers;
this.name = name;
this.executor = createExecutorService();
this.executor = createExecutorService(nWorkers, name);
}

/**
Expand Down Expand Up @@ -112,13 +114,25 @@ public int getTotalNumberOfWorkers() {
return nWorkers;
}

@Deprecated(forRemoval = true) // currently unused and this class is internal
public int getNumberOfTasks() {
return _taskQueue.size();
}

private ExecutorService createExecutorService() {
/**
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
*
* @param nWorkers the number of workers in the thread pool
* @param name the base name for threads in the pool
* @return
*/
protected ExecutorService createExecutorService(int nWorkers, String name) {

// use a LinkedBlockingQueue by default
this._taskQueue = new LinkedBlockingQueue<>();

ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
new NamingThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
return executor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.query.BindingSet;

/**
* The default {@link SchedulerFactory}
*/
public class DefaultSchedulerFactory implements SchedulerFactory {

public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory();

@Override
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Join Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Union Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Left Join Scheduler");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.query.BindingSet;

/**
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background
*
* @see DefaultSchedulerFactory
* @author Andreas Schwarte
*/
public interface SchedulerFactory {

/**
* Create a {@link ControlledWorkerScheduler} for joins
*
* @param federationContext
* @param nWorkers
* @return
*/
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for unions
*
* @param federationContext
* @param nWorkers
* @return
*/
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for left joins
*
* @param federationContext
* @param nWorkers
* @return
*/
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
}

0 comments on commit d74e6be

Please sign in to comment.