diff --git a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java index f2d96ecc435..31d140b3889 100644 --- a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java +++ b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java @@ -289,6 +289,21 @@ public class LuceneSail extends NotifyingSailWrapper { */ public static final String LUCENE_RAMDIR_KEY = "useramdir"; + /** + * Set the key "fsyncInterval=<t>" as sail parameter to configure the interval in milliseconds in which fsync + * is called on the Lucene index. Set to 0 or a negative value to call fsync synchronously after each operation. + * Default is 0. Setting this parameter to a positive value will improve performance for frequent writes, but may + * cause the loss of the last few operations in case of a crash. + */ + public static final String FSYNC_INTERVAL_KEY = "fsyncInterval"; + + /** + * Set the key "fsyncMaxPendingFiles=<n>" as sail parameter to configure the maximum number of files pending + * to be fsynced. When this number is reached, a fsync is forced to limit memory usage. Default is 5000. This + * parameter only has an effect when {@link #FSYNC_INTERVAL_KEY} is set to a positive value. + */ + public static final String FSYNC_MAX_PENDING_FILES_KEY = "fsyncMaxPendingFiles"; + /** * Set the key "defaultNumDocs=<n>" as sail parameter to limit the maximum number of documents to return from * a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java new file mode 100644 index 00000000000..81732512c58 --- /dev/null +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java @@ -0,0 +1,247 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.eclipse.rdf4j.sail.SailException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around a Lucene Directory that batches sync and metadata sync calls to be executed at a fixed interval. + * + * @author Piotr Sowiński + */ +class DelayedSyncDirectoryWrapper extends FilterDirectory { + + final private Logger logger = LoggerFactory.getLogger(getClass()); + + final private ScheduledThreadPoolExecutor scheduler; + + final private AtomicBoolean needsMetadataSync = new AtomicBoolean(false); + + final private AtomicReference lastSyncThrowable = new AtomicReference<>(null); + + final private HashSet pendingSyncs = new HashSet<>(); + + final private int maxPendingSyncs; + + private final Object syncMonitor = new Object(); + + private boolean closed = false; + + /** + * Creates a new instance of LuceneDirectoryWrapper. + * + * @param in the underlying directory + * @param fsyncInterval the interval in milliseconds writes after which a fsync is performed + * @param maxPendingSyncs the maximum number of pending syncs to accumulate before forcing a sync + */ + DelayedSyncDirectoryWrapper(Directory in, long fsyncInterval, int maxPendingSyncs) { + super(in); + + assert fsyncInterval > 0; + assert maxPendingSyncs > 0; + + this.maxPendingSyncs = maxPendingSyncs; + + // Use a daemon thread so the scheduler does not prevent JVM shutdown. + this.scheduler = new ScheduledThreadPoolExecutor(1, r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("rdf4j-lucene-sync-" + t.getId()); + return t; + }); + + // Help GC by removing cancelled tasks from the queue. + this.scheduler.setRemoveOnCancelPolicy(true); + + this.scheduler.scheduleAtFixedRate( + () -> { + try { + doSync(); + } catch (Throwable e) { + // keep scheduling even if Errors occur + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index", e); + // Throwable is recorded and rethrown on next sync()/syncMetaData() call by checkException(). + try { + Thread.sleep(10); // slight throttle to avoid busy looping + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }, + fsyncInterval, + fsyncInterval, + TimeUnit.MILLISECONDS + ); + } + + private void doSync() throws IOException { + logger.debug("Performing periodic sync of Lucene index"); + List toSync; + synchronized (pendingSyncs) { + toSync = new ArrayList<>(pendingSyncs); + pendingSyncs.clear(); + } + + boolean metaRequestedInitial = this.needsMetadataSync.get(); + boolean metaToProcess = false; + + try { + if (toSync.isEmpty() && !metaRequestedInitial) { + logger.debug("Nothing to sync"); + // Nothing to sync + return; + } + + synchronized (syncMonitor) { + // Process metadata first if requested + metaToProcess = this.needsMetadataSync.getAndSet(false); + if (metaToProcess) { + try { + logger.debug("Syncing metadata"); + super.syncMetaData(); + } catch (Throwable e) { + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index metadata", + e); + throw e; + } + } + + if (!toSync.isEmpty()) { + try { + logger.debug("Syncing files"); + super.sync(toSync); + } catch (Throwable e) { + // Lucene files may be merged/removed between scheduling and sync. + // Treat missing files as benign and attempt per-file sync, ignoring those missing. + if (e instanceof java.nio.file.NoSuchFileException || e instanceof FileNotFoundException) { + for (String name : toSync) { + try { + super.sync(Collections.singleton(name)); + } catch (java.nio.file.NoSuchFileException | FileNotFoundException logged) { +// logger.warn( +// "Could not sync file {},it seems to have been removed before it could be synced. Probably fine.", +// name, logged); + throw logged; + } catch (Throwable t) { + logger.error(t.getClass().getSimpleName() + + " during a periodic sync of Lucene index files (per-file)", t); + throw t; + } + } + // Consider sync successful when only missing files were encountered. + } else { + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index files", + e); + throw e; + } + } + } + } + } catch (Throwable t) { + lastSyncThrowable.set(t); + synchronized (pendingSyncs) { + pendingSyncs.addAll(toSync); + if (metaToProcess) { + needsMetadataSync.set(true); + } + } + throw t; + } + + } + + @Override + public void sync(Collection names) throws IOException { + checkException(); + if (closed) { + throw new SailException("DelayedSyncDirectoryWrapper is closed"); + } + + boolean doImmediateSync = false; + synchronized (pendingSyncs) { + pendingSyncs.addAll(names); + if (pendingSyncs.size() >= maxPendingSyncs) { + // If we have accumulated too many pending syncs, do a sync right away + // to avoid excessive memory usage + doImmediateSync = true; + } + } + if (doImmediateSync) { + doSync(); + } + } + + @Override + public void syncMetaData() throws IOException { + // Request a metadata sync even if a previous error is pending + needsMetadataSync.set(true); + checkException(); + if (closed) { + throw new SailException("DelayedSyncDirectoryWrapper is closed"); + } + } + + private void checkException() throws IOException { + final Throwable t = lastSyncThrowable.getAndSet(null); + if (t != null) { + // Rethrow the last exception if there was one. + // This will fail the current transaction, and not the one that caused the original exception. + // But there is no other way to notify the caller of the error, as the sync is done asynchronously. + + if (t instanceof IOException) { + throw ((IOException) t); + } else { + throw new SailException(t); + } + } + } + + @Override + public void close() throws IOException { + closed = true; + + // Finish the current sync task, if in progress and then shut down + try { + scheduler.shutdown(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + logger.error("Failed to shut down Lucene directory sync scheduler within 10s"); + throw new SailException("Failed to shut down Lucene directory sync scheduler within 10s"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); + } finally { + // Do a final sync of any remaining files + try { + doSync(); + } finally { + super.close(); + } + } + } +} diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java index 5999a91cbe8..1b80da33bcc 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -227,6 +226,23 @@ protected Directory createDirectory(Properties parameters) throws IOException { throw new IOException("No luceneIndex set, and no '" + LuceneSail.LUCENE_DIR_KEY + "' or '" + LuceneSail.LUCENE_RAMDIR_KEY + "' parameter given. "); } + long fsyncInterval = 0; + int maxPendingSyncs = 5000; + try { + var param = parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY, "0"); + fsyncInterval = Long.parseLong(param); + } catch (NumberFormatException e) { + logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_INTERVAL_KEY, e.getMessage()); + } + try { + var param = parameters.getProperty(LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, "5000"); + maxPendingSyncs = Integer.parseInt(param); + } catch (NumberFormatException e) { + logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, e.getMessage()); + } + if (fsyncInterval > 0) { + dir = new DelayedSyncDirectoryWrapper(dir, fsyncInterval, maxPendingSyncs); + } return dir; } @@ -385,10 +401,18 @@ public void shutDown() throws IOException { } } finally { try { - IndexWriter toCloseIndexWriter = indexWriter; - indexWriter = null; - if (toCloseIndexWriter != null) { - toCloseIndexWriter.close(); + try { + IndexWriter toCloseIndexWriter = indexWriter; + indexWriter = null; + if (toCloseIndexWriter != null) { + toCloseIndexWriter.close(); + } + } finally { + if (directory != null) { + // Close the directory -- if asynchronous fsync is used, this will clean + // up the scheduler thread too. + directory.close(); + } } } finally { if (!exceptions.isEmpty()) { diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java new file mode 100644 index 00000000000..d0423b8cdb0 --- /dev/null +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java @@ -0,0 +1,314 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.RAMDirectory; +import org.junit.jupiter.api.Test; + +/** + * @author Piotr Sowiński + */ +public class DelayedSyncDirectoryWrapperTest { + + private static final class TrackingDirectory extends FilterDirectory { + private AtomicInteger syncCount = new AtomicInteger(0); + private AtomicInteger metaSyncCount = new AtomicInteger(0); + private AtomicInteger closeCount = new AtomicInteger(0); + + TrackingDirectory(Directory in) { + super(in); + } + + @Override + public void sync(Collection names) throws IOException { + syncCount.getAndIncrement(); + super.sync(names); + } + + @Override + public void syncMetaData() throws IOException { + metaSyncCount.getAndIncrement(); + super.syncMetaData(); + } + + @Override + public void close() throws IOException { + closeCount.getAndIncrement(); + super.close(); + } + + public int getSyncCount() { + return syncCount.get(); + } + + public int getMetaSyncCount() { + return metaSyncCount.get(); + } + + public int getCloseCount() { + return closeCount.get(); + } + } + + @Test + public void testSyncData() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + + // The sync should be delayed, so the count should still be 0 + assertEquals(0, dir.getSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getSyncCount()); + // Meta sync should still be 0 + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(1, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMetaData() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.syncMetaData(); + + // The meta sync should be delayed, so the count should still be 0 + assertEquals(0, dir.getMetaSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getMetaSyncCount()); + // Regular sync should still be 0 + assertEquals(0, dir.getSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(0, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMixed() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + + // The syncs should be delayed, so the counts should still be 0 + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + + waitFor(700); + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + + // Wait again to ensure no extra syncs occur + waitFor(700); + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMixed_afterClose() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.close(); + + // The syncs should be executed on close + assertEquals(1, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testCloseOnIndexShutDown() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + final var index = new LuceneIndex(delayedDir, new StandardAnalyzer()); + assertEquals(0, dir.getCloseCount()); + + index.shutDown(); + + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testRethrowLastSyncException() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + final var dir = new FilterDirectory(new RAMDirectory()) { + @Override + public void sync(Collection names) throws IOException { + try { + throw new IOException("Simulated IO exception during sync"); + } finally { + latch.countDown(); + } + } + + @Override + public void syncMetaData() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + throw new IOException("Simulated IO exception during syncMetaData"); + } finally { + latch2.countDown(); + } + } + }; + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 300, 100); + + // This should not throw immediately + delayedDir.sync(List.of("file1", "file2")); + latch.await(); + boolean exceptionThrown = false; + for (int i = 0; i < 10; i++) { + Thread.sleep(10); + try { + delayedDir.syncMetaData(); + } catch (IOException e) { + if (!"Simulated IO exception during sync".equals(e.getMessage())) { + if (i < 9) { + continue; + } + } + // The exception from the previous sync should be rethrown here + assertEquals("Simulated IO exception during sync", e.getMessage()); + exceptionThrown = true; + } + + } + if (!exceptionThrown) { + fail("Expected IOException was not thrown"); + } + + latch2.await(); + + exceptionThrown = false; + for (int i = 0; i < 10; i++) { + Thread.sleep(10); + try { + delayedDir.sync(List.of("file3")); + } catch (IOException e) { + if (!"Simulated IO exception during syncMetaData".equals(e.getMessage())) { + if (i < 9) { + continue; + } + } + // The exception from the previous sync should be rethrown here + assertEquals("Simulated IO exception during syncMetaData", e.getMessage()); + exceptionThrown = true; + } + } + + if (!exceptionThrown) { + fail("Expected IOException was not thrown"); + } + + } + + @Test + public void testSyncIfOverSyncLimit() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper( + dir, + 100_000, // Large interval to prevent scheduled syncs during the test + 5 // Low max pending syncs to trigger sync quickly + ); + + assertEquals(0, dir.getSyncCount()); + delayedDir.sync(List.of("file1", "file2", "file3", "file4")); + // Still no sync should have occurred, 4 < 5 + assertEquals(0, dir.getSyncCount()); + // Sync the same files again, should still be 4 unique files + delayedDir.sync(List.of("file1", "file2", "file3", "file4")); + assertEquals(0, dir.getSyncCount()); + // Sync one more file, should trigger the sync as we now have 5 unique files + delayedDir.sync(List.of("file5")); + assertEquals(1, dir.getSyncCount()); + } + + private void waitFor(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java new file mode 100644 index 00000000000..72276cae57a --- /dev/null +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java @@ -0,0 +1,58 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.store.NIOFSDirectory; +import org.eclipse.rdf4j.sail.lucene.LuceneSail; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test to verify that when fsync interval is set, the index uses DelayedSyncDirectoryWrapper. It also checks if the + * index still works correctly while using the wrapper. + * + * @author Piotr Sowiński + */ +public class LuceneDelayedFsyncTest extends AbstractGenericLuceneTest { + + @TempDir + public File dataDir; + + private LuceneIndex index; + + @Override + protected void configure(LuceneSail sail) throws IOException { + index = new LuceneIndex(new NIOFSDirectory(dataDir.toPath()), new StandardAnalyzer()); + var params = new Properties(); + params.setProperty(LuceneSail.FSYNC_INTERVAL_KEY, "5000"); // 5 seconds + params.setProperty(LuceneSail.LUCENE_DIR_KEY, dataDir.getAbsolutePath()); + try { + index.initialize(params); + } catch (Exception e) { + throw new RuntimeException(e); + } + sail.setLuceneIndex(index); + } + + @Test + public void testIndexSettings() { + assertNotNull(index); + assertThat(index.getDirectory()).isInstanceOf(DelayedSyncDirectoryWrapper.class); + } +}