diff --git a/compliance/repository/src/test/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedServiceIntegrationTest.java b/compliance/repository/src/test/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedServiceIntegrationTest.java index 763d3c2a6d2..9b92fd657e3 100644 --- a/compliance/repository/src/test/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedServiceIntegrationTest.java +++ b/compliance/repository/src/test/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedServiceIntegrationTest.java @@ -295,7 +295,7 @@ public void test9_connectionHandling() throws Exception { .map(value -> vf.createStatement(iri("s1"), RDFS.LABEL, value)) .collect(Collectors.toList())); - ExecutorService executor = Executors.newFixedThreadPool(5); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); try { for (int i = 0; i < 5; i++) { executor.submit(() -> { diff --git a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java index 43412f081f9..2ca031527af 100644 --- a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java +++ b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java @@ -164,18 +164,9 @@ public long getRetryInterval() { *--------------*/ public SharedHttpClientSessionManager() { - final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory(); - - ExecutorService threadPoolExecutor = Executors.newCachedThreadPool((Runnable runnable) -> { - Thread thread = backingThreadFactory.newThread(runnable); - thread.setName( - String.format("rdf4j-SharedHttpClientSessionManager-%d", threadCount.getAndIncrement())); - thread.setDaemon(true); - return thread; - }); - - Integer corePoolSize = Integer.getInteger(CORE_POOL_SIZE_PROPERTY, 1); - ((ThreadPoolExecutor) threadPoolExecutor).setCorePoolSize(corePoolSize); + ExecutorService threadPoolExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual() + .name("rdf4j-SharedHttpClientSessionManager-", threadCount.getAndIncrement()) + .factory()); this.executor = threadPoolExecutor; } diff --git a/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/MinimalContextNowTest.java b/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/MinimalContextNowTest.java index a4d5af4ebb5..99102c89d61 100644 --- a/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/MinimalContextNowTest.java +++ b/core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/MinimalContextNowTest.java @@ -49,7 +49,7 @@ public void testConcurrentAccessToNow() throws ExecutionException, InterruptedEx int numberOfIterations = 100; int numberOfThreads = 10; - ExecutorService executorService = Executors.newCachedThreadPool(); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); try { for (int i = 0; i < numberOfIterations; i++) { diff --git a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/query/SPARQLOperation.java b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/query/SPARQLOperation.java index f1e9b8ba114..fb134112375 100644 --- a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/query/SPARQLOperation.java +++ b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/query/SPARQLOperation.java @@ -35,7 +35,7 @@ @Deprecated public abstract class SPARQLOperation implements Operation { - private static final Executor executor = Executors.newCachedThreadPool(); + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); protected HttpClient client; diff --git a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/benchmarks/BaseLockManagerBenchmark.java b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/benchmarks/BaseLockManagerBenchmark.java index cdae8228540..569be9dbdd8 100644 --- a/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/benchmarks/BaseLockManagerBenchmark.java +++ b/core/sail/api/src/test/java/org/eclipse/rdf4j/common/concurrent/locks/benchmarks/BaseLockManagerBenchmark.java @@ -33,7 +33,7 @@ public class BaseLockManagerBenchmark { public void setUp() { Logger root = (Logger) LoggerFactory.getLogger(ReadPrefReadWriteLockManager.class.getName()); root.setLevel(ch.qos.logback.classic.Level.ERROR); - executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + executorService = Executors.newVirtualThreadPerTaskExecutor(); } @TearDown(Level.Trial) diff --git a/core/sail/api/src/test/java/org/eclipse/rdf4j/sail/helpers/AbstractSailTest.java b/core/sail/api/src/test/java/org/eclipse/rdf4j/sail/helpers/AbstractSailTest.java index c1c64931f40..0d3b6312d58 100644 --- a/core/sail/api/src/test/java/org/eclipse/rdf4j/sail/helpers/AbstractSailTest.java +++ b/core/sail/api/src/test/java/org/eclipse/rdf4j/sail/helpers/AbstractSailTest.java @@ -101,7 +101,7 @@ public void testConcurrentAutoInit() throws Exception { CountDownLatch latch = new CountDownLatch(count); for (int i = 0; i < count; i++) { - new Thread(new SailGetConnectionTask(subject, latch)).start(); + Thread.ofVirtual().start(new SailGetConnectionTask(subject, latch)); } if (!latch.await(30, TimeUnit.SECONDS)) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 02e7d71bf5d..531711a122e 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -64,7 +64,7 @@ class LmdbSailStore implements SailStore { private final ValueStore valueStore; - private final ExecutorService tripleStoreExecutor = Executors.newCachedThreadPool(); + private final ExecutorService tripleStoreExecutor = Executors.newVirtualThreadPerTaskExecutor(); private final CircularBuffer opQueue = new CircularBuffer<>(1024); private volatile Throwable tripleStoreException; private final AtomicBoolean running = new AtomicBoolean(false); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java index eef34f93d1c..b4563e141a8 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java @@ -109,7 +109,7 @@ public void manyConcurrentTransactions() throws IOException { SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( new NotifySailWrapper( new NotifySailWrapper(new NotifySailWrapper(new LmdbStore(temporaryFolder))))))); - ExecutorService executorService = Executors.newFixedThreadPool(10); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); try { diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java index 1e09c30169e..6aee01eb74f 100644 --- a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java @@ -773,7 +773,7 @@ public void testMultithreadedAdd() throws InterruptedException { final CountDownLatch endLatch = new CountDownLatch(numThreads); final Set exceptions = ConcurrentHashMap.newKeySet(); for (int i = 0; i < numThreads; i++) { - new Thread(new Runnable() { + Thread.ofVirtual().start(new Runnable() { private final long iterationCount = 10 + Math.round(random.nextDouble() * 100); @@ -791,7 +791,7 @@ public void run() { endLatch.countDown(); } } - }).start(); + }); } startLatch.countDown(); endLatch.await(); diff --git a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java index 06ab62708af..fe6ba4661c9 100644 --- a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java +++ b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java @@ -11,10 +11,12 @@ package org.eclipse.rdf4j.sail.memory.benchmark; import java.io.InputStream; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,6 +34,7 @@ public class BaseConcurrentBenchmark { Repository repository; private ExecutorService executorService; + private Semaphore semaphore; static InputStream getResourceAsStream(String filename) { return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename); @@ -42,7 +45,8 @@ public void setup() throws Exception { if (executorService != null) { executorService.shutdownNow(); } - executorService = Executors.newFixedThreadPool(8); + executorService = Executors.newVirtualThreadPerTaskExecutor(); + semaphore = new Semaphore(8); } @TearDown(Level.Trial) @@ -59,16 +63,21 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException { CountDownLatch latchDone = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { - executorService.submit(() -> { - try { - latch.await(); - runnable.run(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - latchDone.countDown(); - } - }); + semaphore.acquireUninterruptibly(); + try { + executorService.submit(() -> { + try { + latch.await(); + runnable.run(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + latchDone.countDown(); + } + }); + } finally { + semaphore.release(); + } } latch.countDown(); @@ -77,7 +86,12 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException { } Future submit(Runnable runnable) { - return executorService.submit(runnable); + semaphore.acquireUninterruptibly(); + try { + return executorService.submit(runnable); + } finally { + semaphore.release(); + } } Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection, diff --git a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/model/MemStatementListTestIT.java b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/model/MemStatementListTestIT.java index c8594d67184..5938b529867 100644 --- a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/model/MemStatementListTestIT.java +++ b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/model/MemStatementListTestIT.java @@ -106,7 +106,7 @@ public void addMultipleThreads() throws ExecutionException, InterruptedException MemStatementList memStatementList = new MemStatementList(); - ExecutorService executorService = Executors.newCachedThreadPool(); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); try { List> collect = partition diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java index 277c6c498ec..558c257f1aa 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java @@ -101,7 +101,7 @@ public void manyConcurrentTransactions() throws IOException { SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( new NotifySailWrapper( new NotifySailWrapper(new NotifySailWrapper(new NativeStore(temporaryFolder))))))); - ExecutorService executorService = Executors.newFixedThreadPool(10); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); try { diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java index f2b75e528ed..c6bda70ad08 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSail.java @@ -252,6 +252,10 @@ public void setBaseSail(Sail baseSail) { @Experimental protected RevivableExecutorService getExecutorService() { return new RevivableExecutorService( + // Refactoring the below to Executors.newThreadPerTaskExecutor(r -> { + // Thread t = Thread.ofVirtual().factory().newThread(r); + // causes tests to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10561618648/job/29257740868) () -> Executors.newFixedThreadPool(AVAILABLE_PROCESSORS, r -> { Thread t = Executors.defaultThreadFactory().newThread(r); diff --git a/core/sail/shacl/src/test/java/org/eclipse/rdf4j/sail/shacl/MultithreadedTest.java b/core/sail/shacl/src/test/java/org/eclipse/rdf4j/sail/shacl/MultithreadedTest.java index 89aea92cac9..8316cd3d00a 100644 --- a/core/sail/shacl/src/test/java/org/eclipse/rdf4j/sail/shacl/MultithreadedTest.java +++ b/core/sail/shacl/src/test/java/org/eclipse/rdf4j/sail/shacl/MultithreadedTest.java @@ -223,6 +223,8 @@ private void parallelTest(List> list, IsolationLevels isolatio Random r = new Random(52465534); + // Refactoring this to Executors.newVirtualThreadPerTaskExecutor() causes tests to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10567565368/job/29276778845) ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); try { @@ -467,6 +469,8 @@ private void runValidationFailuresTest(Sail sail, IsolationLevels isolationLevel deadlockDetectionThread.setDaemon(true); deadlockDetectionThread.start(); + // Refactoring this to Executors.newVirtualThreadPerTaskExecutor() causes tests to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10568156440/job/29278593171) executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); Utils.loadShapeData(repository, "complexBenchmark/shacl.trig"); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java index 263ace78bf0..b2e25919efd 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java @@ -92,6 +92,9 @@ public FederationManager() { public void init(FedX federation, FederationContext federationContext) { this.federation = federation; this.federationContext = federationContext; + // Refactoring this to Executors.newThreadPerTaskExecutor(new NamingThreadFactory("FedX Executor")); + // and refactoring NamingThreadFactory to use virtual threads causes ServicesTest to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2) this.executor = Executors.newCachedThreadPool(new NamingThreadFactory("FedX Executor")); updateFederationType(); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index f677ca46ea4..32dd3dbae3e 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -118,6 +118,10 @@ public int getNumberOfTasks() { private ExecutorService createExecutorService() { + // Refactoring this to ExecutorService executor = Executors.newThreadPerTaskExecutor(new + // NamingThreadFactory(name)) + // and refactoring NamingThreadFactory to use virtual threads causes ServicesTest to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2) ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue, new NamingThreadFactory(name)); executor.allowCoreThreadTimeOut(true); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/NamingThreadFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/NamingThreadFactory.java index 2db262875a6..05745afee71 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/NamingThreadFactory.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/NamingThreadFactory.java @@ -29,6 +29,10 @@ public NamingThreadFactory(String baseName) { @Override public Thread newThread(Runnable r) { + // Refactoring this to Thread t = Thread.ofVirtual().name(baseName + "-", + // nextThreadId.incrementAndGet()).unstarted(r); + // causes ServicesTest to hang forever + // (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2) Thread t = Executors.defaultThreadFactory().newThread(r); t.setName(baseName + "-" + nextThreadId.incrementAndGet()); return t; diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RemoteRepositoryTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RemoteRepositoryTest.java index 1782e30d54a..3cec035b259 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RemoteRepositoryTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RemoteRepositoryTest.java @@ -40,7 +40,7 @@ public class RemoteRepositoryTest { */ public static void main(String[] args) throws Exception { - ExecutorService executor = Executors.newFixedThreadPool(30); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); Repository repo = new HTTPRepository("http://10.212.10.29:8081/openrdf-sesame", "drugbank"); diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RepositoryPerformance.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RepositoryPerformance.java index 67d7ac1af82..e55f8617027 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RepositoryPerformance.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/RepositoryPerformance.java @@ -55,7 +55,7 @@ private static abstract class PerformanceBase { private static final int MAX_INSTANCES = Integer.MAX_VALUE; private static final int N_QUERIES = 100; - private final ExecutorService executor = Executors.newFixedThreadPool(30); + private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); private final IRI type; diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/SparqlRepositoryTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/SparqlRepositoryTest.java index 3f9ac1b0957..efbda858733 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/SparqlRepositoryTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/performance/SparqlRepositoryTest.java @@ -29,7 +29,7 @@ public class SparqlRepositoryTest { public static void main(String[] args) throws Exception { - ExecutorService executor = Executors.newFixedThreadPool(20); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); SPARQLRepository repo = new SPARQLRepository("http://dbpedia.org/sparql"); repo.init(); diff --git a/tools/workbench/src/main/java/org/eclipse/rdf4j/workbench/commands/SummaryServlet.java b/tools/workbench/src/main/java/org/eclipse/rdf4j/workbench/commands/SummaryServlet.java index fe50b00d616..82979a7254d 100644 --- a/tools/workbench/src/main/java/org/eclipse/rdf4j/workbench/commands/SummaryServlet.java +++ b/tools/workbench/src/main/java/org/eclipse/rdf4j/workbench/commands/SummaryServlet.java @@ -34,7 +34,7 @@ public class SummaryServlet extends TransformationServlet { - private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); private static final Logger LOGGER = LoggerFactory.getLogger(SummaryServlet.class);