From d74e6be2e9c4bc17dc739b8c445c965cd9b5c4d4 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Mon, 11 Nov 2024 14:34:56 +0100 Subject: [PATCH] GH-5197: preparation for supporting fair sub-query execution in FedX This change adds preparational infrastructure for having different implementations of schedulers. Configuration is here prepared by means of defining a "SchedulerFactory" interface with a default implementation aside (which essentially mimics the current behavior). Note that for ease of development some aspects of ControlledWorkerScheduler are made accessible to sub-classes. The idea is that in the end version there is an abstract scheduler class providing shared functionality and different implementation (e.g. the current FIFO one and a fair implementation) --- .../org/eclipse/rdf4j/federated/FedX.java | 20 ++++++++ .../rdf4j/federated/FederationManager.java | 15 +++--- .../concurrent/ControlledWorkerScheduler.java | 22 ++++++-- .../concurrent/DefaultSchedulerFactory.java | 44 ++++++++++++++++ .../concurrent/SchedulerFactory.java | 50 +++++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java index 568ae96d56a..f8f7d226607 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java @@ -22,6 +22,8 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint; import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXException; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient { private FederationEvaluationStrategyFactory strategyFactory; + private SchedulerFactory schedulerFactory; + private WriteStrategyFactory writeStrategyFactory; private File dataDir; @@ -96,6 +100,22 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory this.strategyFactory = strategyFactory; } + /* package */ SchedulerFactory getSchedulerFactory() { + if (schedulerFactory == null) { + schedulerFactory = DefaultSchedulerFactory.INSTANCE; + } + return schedulerFactory; + } + + /** + * Set the {@link SchedulerFactory}. Can only be done before initialization of the federation + * + * @param schedulerFactory the {@link SchedulerFactory} + */ + public void setSchedulerFactory(SchedulerFactory schedulerFactory) { + this.schedulerFactory = schedulerFactory; + } + /** * * @param writeStrategyFactory the {@link WriteStrategyFactory} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java index 263ace78bf0..e2b34c8b708 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java @@ -26,6 +26,7 @@ import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; @@ -118,26 +119,28 @@ public void reset() { log.debug("Scheduler for join and union are reset."); } + SchedulerFactory schedulerFactory = federation.getSchedulerFactory(); + Optional taskWrapper = federationContext.getConfig().getTaskWrapper(); if (joinScheduler != null) { joinScheduler.abort(); } - joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(), - "Join Scheduler"); + joinScheduler = schedulerFactory.createJoinScheduler(federationContext, + federationContext.getConfig().getJoinWorkerThreads()); taskWrapper.ifPresent(joinScheduler::setTaskWrapper); if (unionScheduler != null) { unionScheduler.abort(); } - unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(), - "Union Scheduler"); + unionScheduler = schedulerFactory.createUnionScheduler(federationContext, + federationContext.getConfig().getUnionWorkerThreads()); taskWrapper.ifPresent(unionScheduler::setTaskWrapper); if (leftJoinScheduler != null) { leftJoinScheduler.abort(); } - leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(), - "Left Join Scheduler"); + leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext, + federationContext.getConfig().getLeftJoinWorkerThreads()); taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index 1060e86a2de..da2ef0d334a 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -11,6 +11,7 @@ package org.eclipse.rdf4j.federated.evaluation.concurrent; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -42,7 +43,8 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw private final ExecutorService executor; - private final LinkedBlockingQueue _taskQueue = new LinkedBlockingQueue<>(); + // Note: initialized in #createExecutorService + protected BlockingQueue _taskQueue; private final int nWorkers; private final String name; @@ -57,7 +59,7 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw public ControlledWorkerScheduler(int nWorkers, String name) { this.nWorkers = nWorkers; this.name = name; - this.executor = createExecutorService(); + this.executor = createExecutorService(nWorkers, name); } /** @@ -112,13 +114,25 @@ public int getTotalNumberOfWorkers() { return nWorkers; } + @Deprecated(forRemoval = true) // currently unused and this class is internal public int getNumberOfTasks() { return _taskQueue.size(); } - private ExecutorService createExecutorService() { + /** + * Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The + * default implementation creates a thread pool with a {@link LinkedBlockingQueue}. + * + * @param nWorkers the number of workers in the thread pool + * @param name the base name for threads in the pool + * @return + */ + protected ExecutorService createExecutorService(int nWorkers, String name) { + + // use a LinkedBlockingQueue by default + this._taskQueue = new LinkedBlockingQueue<>(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue, + ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, new NamingThreadFactory(name)); executor.allowCoreThreadTimeOut(true); return executor; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java new file mode 100644 index 00000000000..dd063b6191a --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * The default {@link SchedulerFactory} + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory(); + + @Override + public ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Join Scheduler"); + } + + @Override + public ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Union Scheduler"); + } + + @Override + public ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Left Join Scheduler"); + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java new file mode 100644 index 00000000000..0e53570d4ab --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java @@ -0,0 +1,50 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background + * + * @see DefaultSchedulerFactory + * @author Andreas Schwarte + */ +public interface SchedulerFactory { + + /** + * Create a {@link ControlledWorkerScheduler} for joins + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for unions + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for left joins + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, int nWorkers); +}