Skip to content

Commit ed3e249

Browse files
committed
GH-5291 Add asynchronous fsync to LuceneSail
1 parent 18f1bef commit ed3e249

File tree

5 files changed

+498
-5
lines changed

5 files changed

+498
-5
lines changed

core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,21 @@ public class LuceneSail extends NotifyingSailWrapper {
289289
*/
290290
public static final String LUCENE_RAMDIR_KEY = "useramdir";
291291

292+
/**
293+
* Set the key "fsyncInterval=<t>" as sail parameter to configure the interval in milliseconds in which fsync
294+
* is called on the Lucene index. Set to 0 or a negative value to call fsync synchronously after each operation.
295+
* Default is 0. Setting this parameter to a positive value will improve performance for frequent writes, but may
296+
* cause the loss of the last few operations in case of a crash.
297+
*/
298+
public static final String FSYNC_INTERVAL_KEY = "fsyncInterval";
299+
300+
/**
301+
* Set the key "fsyncMaxPendingFiles=<n>" as sail parameter to configure the maximum number of files pending
302+
* to be fsynced. When this number is reached, a fsync is forced to limit memory usage. Default is 5000. This
303+
* parameter only has an effect when {@link #FSYNC_INTERVAL_KEY} is set to a positive value.
304+
*/
305+
public static final String FSYNC_MAX_PENDING_FILES_KEY = "fsyncMaxPendingFiles";
306+
292307
/**
293308
* Set the key "defaultNumDocs=<n>" as sail parameter to limit the maximum number of documents to return from
294309
* a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.sail.lucene.impl;
12+
13+
import java.io.IOException;
14+
import java.util.ArrayList;
15+
import java.util.Collection;
16+
import java.util.HashSet;
17+
import java.util.List;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import org.apache.lucene.store.Directory;
25+
import org.apache.lucene.store.FilterDirectory;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/**
30+
* Wrapper around a Lucene Directory that batches sync and metadata sync calls to be executed at a fixed interval.
31+
*
32+
* @author Piotr Sowiński
33+
*/
34+
class DelayedSyncDirectoryWrapper extends FilterDirectory {
35+
36+
final private Logger logger = LoggerFactory.getLogger(getClass());
37+
38+
final private ScheduledExecutorService scheduler;
39+
40+
final private AtomicBoolean needsMetadataSync = new AtomicBoolean(false);
41+
42+
final private AtomicReference<IOException> lastSyncException = new AtomicReference<>(null);
43+
44+
final private HashSet<String> pendingSyncs = new HashSet<>();
45+
46+
final private int maxPendingSyncs;
47+
48+
/**
49+
* Creates a new instance of LuceneDirectoryWrapper.
50+
*
51+
* @param in the underlying directory
52+
* @param fsyncInterval the interval in milliseconds writes after which a fsync is performed
53+
* @param maxPendingSyncs the maximum number of pending syncs to accumulate before forcing a sync
54+
*/
55+
DelayedSyncDirectoryWrapper(Directory in, long fsyncInterval, int maxPendingSyncs) {
56+
super(in);
57+
assert fsyncInterval > 0;
58+
assert maxPendingSyncs > 0;
59+
this.maxPendingSyncs = maxPendingSyncs;
60+
scheduler = Executors.newScheduledThreadPool(1);
61+
scheduler.scheduleAtFixedRate(
62+
this::doSync,
63+
fsyncInterval,
64+
fsyncInterval,
65+
TimeUnit.MILLISECONDS
66+
);
67+
}
68+
69+
private void doSync() {
70+
List<String> toSync;
71+
synchronized (pendingSyncs) {
72+
toSync = new ArrayList<>(pendingSyncs);
73+
pendingSyncs.clear();
74+
}
75+
if (!toSync.isEmpty()) {
76+
try {
77+
super.sync(toSync);
78+
} catch (IOException e) {
79+
lastSyncException.set(e);
80+
logger.error("IO error during a periodic sync of Lucene index files", e);
81+
}
82+
}
83+
if (this.needsMetadataSync.getAndSet(false)) {
84+
try {
85+
super.syncMetaData();
86+
} catch (IOException e) {
87+
lastSyncException.set(e);
88+
logger.error("IO error during a periodic sync of Lucene index metadata", e);
89+
}
90+
}
91+
}
92+
93+
@Override
94+
public void sync(Collection<String> names) throws IOException {
95+
final IOException ex = lastSyncException.getAndSet(null);
96+
if (ex != null) {
97+
// Rethrow the last exception if there was one.
98+
// This will fail the current transaction, and not the one that caused the original exception.
99+
// But there is no other way to notify the caller of the error, as the sync is done asynchronously.
100+
throw ex;
101+
}
102+
synchronized (pendingSyncs) {
103+
pendingSyncs.addAll(names);
104+
if (pendingSyncs.size() >= maxPendingSyncs) {
105+
// If we have accumulated too many pending syncs, do a sync right away
106+
// to avoid excessive memory usage
107+
doSync();
108+
}
109+
}
110+
}
111+
112+
@Override
113+
public void syncMetaData() throws IOException {
114+
needsMetadataSync.set(true);
115+
}
116+
117+
@Override
118+
public void close() throws IOException {
119+
// Finish the current sync task, if in progress and then shut down
120+
try {
121+
scheduler.shutdown();
122+
} finally {
123+
// Do a final sync of any remaining files
124+
try {
125+
doSync();
126+
} finally {
127+
super.close();
128+
}
129+
}
130+
}
131+
}

core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Iterator;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.Objects;
2928
import java.util.Properties;
3029
import java.util.Set;
3130
import java.util.concurrent.atomic.AtomicBoolean;
@@ -227,6 +226,23 @@ protected Directory createDirectory(Properties parameters) throws IOException {
227226
throw new IOException("No luceneIndex set, and no '" + LuceneSail.LUCENE_DIR_KEY + "' or '"
228227
+ LuceneSail.LUCENE_RAMDIR_KEY + "' parameter given. ");
229228
}
229+
long fsyncInterval = 0;
230+
int maxPendingSyncs = 5000;
231+
try {
232+
var param = parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY, "0");
233+
fsyncInterval = Long.parseLong(param);
234+
} catch (NumberFormatException e) {
235+
logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_INTERVAL_KEY, e.getMessage());
236+
}
237+
try {
238+
var param = parameters.getProperty(LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, "5000");
239+
maxPendingSyncs = Integer.parseInt(param);
240+
} catch (NumberFormatException e) {
241+
logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, e.getMessage());
242+
}
243+
if (fsyncInterval > 0) {
244+
dir = new DelayedSyncDirectoryWrapper(dir, fsyncInterval, maxPendingSyncs);
245+
}
230246
return dir;
231247
}
232248

@@ -385,10 +401,16 @@ public void shutDown() throws IOException {
385401
}
386402
} finally {
387403
try {
388-
IndexWriter toCloseIndexWriter = indexWriter;
389-
indexWriter = null;
390-
if (toCloseIndexWriter != null) {
391-
toCloseIndexWriter.close();
404+
try {
405+
IndexWriter toCloseIndexWriter = indexWriter;
406+
indexWriter = null;
407+
if (toCloseIndexWriter != null) {
408+
toCloseIndexWriter.close();
409+
}
410+
} finally {
411+
// Close the directory -- if asynchronous fsync is used, this will clean
412+
// up the scheduler thread too.
413+
directory.close();
392414
}
393415
} finally {
394416
if (!exceptions.isEmpty()) {

0 commit comments

Comments
 (0)