Skip to content

Commit

Permalink
GH-5197: javadoc refinements + smaller initialization changes
Browse files Browse the repository at this point in the history
- for minor version compatibility the type of the "_taskQueue" field in
the scheduler cannot be changed (to non-final). Hence, for now we use a
dedicated protected initialization method. In the future (next major
release) the idea is to leave the queue entirely managed by the executor
service.
- refinements and clarifications to the javadoc
  • Loading branch information
aschwarte10 committed Nov 12, 2024
1 parent d74e6be commit ddcadc0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {

private FederationEvaluationStrategyFactory strategyFactory;

private SchedulerFactory schedulerFactory;
private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE;

private WriteStrategyFactory writeStrategyFactory;

Expand Down Expand Up @@ -101,9 +101,6 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
}

/* package */ SchedulerFactory getSchedulerFactory() {
if (schedulerFactory == null) {
schedulerFactory = DefaultSchedulerFactory.INSTANCE;
}
return schedulerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw

private final ExecutorService executor;

// Note: initialized in #createExecutorService
protected BlockingQueue<Runnable> _taskQueue;
// TODO: in the next major version of RDF4J this final field should be removed.
// Initialization of the executor service should managed the details
private final BlockingQueue<Runnable> _taskQueue;

private final int nWorkers;
private final String name;
Expand All @@ -59,6 +60,7 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
public ControlledWorkerScheduler(int nWorkers, String name) {
this.nWorkers = nWorkers;
this.name = name;
this._taskQueue = createBlockingQueue();
this.executor = createExecutorService(nWorkers, name);
}

Expand Down Expand Up @@ -114,24 +116,33 @@ public int getTotalNumberOfWorkers() {
return nWorkers;
}

@Deprecated(forRemoval = true) // currently unused and this class is internal
@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
public int getNumberOfTasks() {
return _taskQueue.size();
}

/**
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
* {@link LinkedBlockingQueue}.
*
* @return
*/
protected BlockingQueue<Runnable> createBlockingQueue() {
return new LinkedBlockingQueue<>();
}

/**
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
*
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
*
* @param nWorkers the number of workers in the thread pool
* @param name the base name for threads in the pool
* @return
*/
protected ExecutorService createExecutorService(int nWorkers, String name) {

// use a LinkedBlockingQueue by default
this._taskQueue = new LinkedBlockingQueue<>();

ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
new NamingThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
import org.eclipse.rdf4j.query.BindingSet;

/**
Expand All @@ -22,16 +26,19 @@
public interface SchedulerFactory {

/**
* Create a {@link ControlledWorkerScheduler} for joins
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind
* joins)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindJoin
* @see ParallelBoundJoinTask
*/
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for unions
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel)
*
* @param federationContext
* @param nWorkers
Expand All @@ -40,11 +47,14 @@ public interface SchedulerFactory {
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for left joins
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind
* joins, i.e. OPTIONAL)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindLeftJoin
* @see ParallelBindLeftJoinTask
*/
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
}

0 comments on commit ddcadc0

Please sign in to comment.