Skip to content

Commit 8f2d4b7

Browse files
committed
GH-5291 Add asynchronous fsync to LuceneSail
1 parent 18f1bef commit 8f2d4b7

File tree

5 files changed

+394
-1
lines changed

5 files changed

+394
-1
lines changed

core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,14 @@ public class LuceneSail extends NotifyingSailWrapper {
289289
*/
290290
public static final String LUCENE_RAMDIR_KEY = "useramdir";
291291

292+
/**
293+
* Set the key "fsyncInterval=<t>" as sail parameter to configure the interval in milliseconds in which fsync
294+
* is called on the Lucene index. Set to 0 or a negative value to call fsync synchronously after each operation.
295+
* Default is 0. Setting this parameter to a positive value will improve performance for frequent writes, but may
296+
* cause the loss of the last few operations in case of a crash.
297+
*/
298+
public static final String FSYNC_INTERVAL_KEY = "fsyncInterval";
299+
292300
/**
293301
* Set the key "defaultNumDocs=<n>" as sail parameter to limit the maximum number of documents to return from
294302
* a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.sail.lucene.impl;
12+
13+
import java.io.IOException;
14+
import java.util.ArrayList;
15+
import java.util.Collection;
16+
import java.util.HashSet;
17+
import java.util.List;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
23+
import org.apache.lucene.store.Directory;
24+
import org.apache.lucene.store.FilterDirectory;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Wrapper around a Lucene Directory that batches sync and metadata sync calls to be executed at a fixed interval.
30+
*
31+
* @author Piotr Sowiński
32+
*/
33+
class DelayedSyncDirectoryWrapper extends FilterDirectory {
34+
35+
private final Logger logger = LoggerFactory.getLogger(getClass());
36+
37+
final private ScheduledExecutorService scheduler;
38+
39+
final private AtomicBoolean needsMetadataSync = new AtomicBoolean(false);
40+
41+
final private HashSet<String> pendingSyncs = new HashSet<>();
42+
43+
/**
44+
* Creates a new instance of LuceneDirectoryWrapper.
45+
*
46+
* @param in the underlying directory
47+
* @param fsyncInterval the interval in milliseconds writes after which a fsync is performed
48+
*/
49+
DelayedSyncDirectoryWrapper(Directory in, long fsyncInterval) {
50+
super(in);
51+
assert fsyncInterval > 0;
52+
scheduler = Executors.newScheduledThreadPool(1);
53+
scheduler.scheduleAtFixedRate(
54+
this::doSync,
55+
fsyncInterval,
56+
fsyncInterval,
57+
TimeUnit.MILLISECONDS
58+
);
59+
}
60+
61+
private void doSync() {
62+
List<String> toSync;
63+
synchronized (pendingSyncs) {
64+
toSync = new ArrayList<>(pendingSyncs);
65+
pendingSyncs.clear();
66+
}
67+
if (!toSync.isEmpty()) {
68+
try {
69+
super.sync(toSync);
70+
} catch (IOException e) {
71+
logger.error("IO error during a periodic sync of Lucene index files", e);
72+
}
73+
}
74+
if (this.needsMetadataSync.getAndSet(false)) {
75+
try {
76+
super.syncMetaData();
77+
} catch (IOException e) {
78+
logger.error("IO error during a periodic sync of Lucene index metadata", e);
79+
}
80+
}
81+
}
82+
83+
@Override
84+
public void sync(Collection<String> names) throws IOException {
85+
synchronized (pendingSyncs) {
86+
pendingSyncs.addAll(names);
87+
}
88+
}
89+
90+
@Override
91+
public void syncMetaData() throws IOException {
92+
needsMetadataSync.set(true);
93+
}
94+
95+
@Override
96+
public void close() throws IOException {
97+
// Finish the current sync task, if in progress and then shut down
98+
scheduler.shutdown();
99+
// Do a final sync of any remaining files
100+
doSync();
101+
super.close();
102+
}
103+
}

core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Iterator;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.Objects;
2928
import java.util.Properties;
3029
import java.util.Set;
3130
import java.util.concurrent.atomic.AtomicBoolean;
@@ -227,6 +226,16 @@ protected Directory createDirectory(Properties parameters) throws IOException {
227226
throw new IOException("No luceneIndex set, and no '" + LuceneSail.LUCENE_DIR_KEY + "' or '"
228227
+ LuceneSail.LUCENE_RAMDIR_KEY + "' parameter given. ");
229228
}
229+
long fsyncInterval = 0;
230+
try {
231+
var param = parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY, "0");
232+
fsyncInterval = Long.parseLong(param);
233+
} catch (NumberFormatException e) {
234+
logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_INTERVAL_KEY, e.getMessage());
235+
}
236+
if (fsyncInterval > 0) {
237+
dir = new DelayedSyncDirectoryWrapper(dir, fsyncInterval);
238+
}
230239
return dir;
231240
}
232241

@@ -390,6 +399,9 @@ public void shutDown() throws IOException {
390399
if (toCloseIndexWriter != null) {
391400
toCloseIndexWriter.close();
392401
}
402+
// Close the directory -- if asynchronous fsync is used, this will clean
403+
// up the scheduler thread too.
404+
directory.close();
393405
} finally {
394406
if (!exceptions.isEmpty()) {
395407
throw new UndeclaredThrowableException(exceptions.get(0));
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.sail.lucene.impl;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
15+
import java.io.IOException;
16+
import java.util.Collection;
17+
import java.util.List;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
21+
import org.apache.lucene.store.Directory;
22+
import org.apache.lucene.store.FilterDirectory;
23+
import org.apache.lucene.store.RAMDirectory;
24+
import org.junit.jupiter.api.Test;
25+
26+
/**
27+
* @author Piotr Sowiński
28+
*/
29+
public class DelayedSyncDirectoryWrapperTest {
30+
31+
private static final class TrackingDirectory extends FilterDirectory {
32+
private AtomicInteger syncCount = new AtomicInteger(0);
33+
private AtomicInteger metaSyncCount = new AtomicInteger(0);
34+
private AtomicInteger closeCount = new AtomicInteger(0);
35+
36+
TrackingDirectory(Directory in) {
37+
super(in);
38+
}
39+
40+
@Override
41+
public void sync(Collection<String> names) throws IOException {
42+
syncCount.getAndIncrement();
43+
super.sync(names);
44+
}
45+
46+
@Override
47+
public void syncMetaData() throws IOException {
48+
metaSyncCount.getAndIncrement();
49+
super.syncMetaData();
50+
}
51+
52+
@Override
53+
public void close() throws IOException {
54+
closeCount.getAndIncrement();
55+
super.close();
56+
}
57+
58+
public int getSyncCount() {
59+
return syncCount.get();
60+
}
61+
62+
public int getMetaSyncCount() {
63+
return metaSyncCount.get();
64+
}
65+
66+
public int getCloseCount() {
67+
return closeCount.get();
68+
}
69+
}
70+
71+
@Test
72+
public void testSyncData() throws IOException {
73+
final var dir = new TrackingDirectory(new RAMDirectory());
74+
final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500);
75+
76+
assertEquals(0, dir.getSyncCount());
77+
assertEquals(0, dir.getMetaSyncCount());
78+
79+
delayedDir.sync(List.of("file1", "file2"));
80+
81+
// The sync should be delayed, so the count should still be 0
82+
assertEquals(0, dir.getSyncCount());
83+
84+
// Wait for more than the fsync interval to allow the scheduled task to run
85+
waitFor(700);
86+
assertEquals(1, dir.getSyncCount());
87+
// Meta sync should still be 0
88+
assertEquals(0, dir.getMetaSyncCount());
89+
90+
delayedDir.close();
91+
92+
// No additional syncs should have occurred after close
93+
assertEquals(1, dir.getSyncCount());
94+
assertEquals(0, dir.getMetaSyncCount());
95+
assertEquals(1, dir.getCloseCount());
96+
}
97+
98+
@Test
99+
public void testSyncMetaData() throws IOException {
100+
final var dir = new TrackingDirectory(new RAMDirectory());
101+
final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500);
102+
103+
assertEquals(0, dir.getSyncCount());
104+
assertEquals(0, dir.getMetaSyncCount());
105+
106+
delayedDir.syncMetaData();
107+
108+
// The meta sync should be delayed, so the count should still be 0
109+
assertEquals(0, dir.getMetaSyncCount());
110+
111+
// Wait for more than the fsync interval to allow the scheduled task to run
112+
waitFor(700);
113+
assertEquals(1, dir.getMetaSyncCount());
114+
// Regular sync should still be 0
115+
assertEquals(0, dir.getSyncCount());
116+
117+
delayedDir.close();
118+
119+
// No additional syncs should have occurred after close
120+
assertEquals(0, dir.getSyncCount());
121+
assertEquals(1, dir.getMetaSyncCount());
122+
assertEquals(1, dir.getCloseCount());
123+
}
124+
125+
@Test
126+
public void testSyncMixed() throws IOException {
127+
final var dir = new TrackingDirectory(new RAMDirectory());
128+
final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500);
129+
130+
assertEquals(0, dir.getSyncCount());
131+
assertEquals(0, dir.getMetaSyncCount());
132+
133+
delayedDir.sync(List.of("file1", "file2"));
134+
delayedDir.sync(List.of("file2", "file456"));
135+
delayedDir.syncMetaData();
136+
delayedDir.syncMetaData();
137+
delayedDir.syncMetaData();
138+
139+
// The syncs should be delayed, so the counts should still be 0
140+
assertEquals(0, dir.getSyncCount());
141+
assertEquals(0, dir.getMetaSyncCount());
142+
143+
// Wait for more than the fsync interval to allow the scheduled task to run
144+
waitFor(700);
145+
assertEquals(1, dir.getSyncCount());
146+
assertEquals(1, dir.getMetaSyncCount());
147+
148+
delayedDir.sync(List.of("file2", "file456"));
149+
delayedDir.syncMetaData();
150+
151+
waitFor(700);
152+
assertEquals(2, dir.getSyncCount());
153+
assertEquals(2, dir.getMetaSyncCount());
154+
155+
// Wait again to ensure no extra syncs occur
156+
waitFor(700);
157+
assertEquals(2, dir.getSyncCount());
158+
assertEquals(2, dir.getMetaSyncCount());
159+
160+
delayedDir.close();
161+
162+
// No additional syncs should have occurred after close
163+
assertEquals(2, dir.getSyncCount());
164+
assertEquals(2, dir.getMetaSyncCount());
165+
assertEquals(1, dir.getCloseCount());
166+
}
167+
168+
@Test
169+
public void testSyncMixed_afterClose() throws IOException {
170+
final var dir = new TrackingDirectory(new RAMDirectory());
171+
final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500);
172+
173+
assertEquals(0, dir.getSyncCount());
174+
assertEquals(0, dir.getMetaSyncCount());
175+
176+
delayedDir.sync(List.of("file1", "file2"));
177+
delayedDir.sync(List.of("file2", "file456"));
178+
delayedDir.syncMetaData();
179+
delayedDir.syncMetaData();
180+
delayedDir.syncMetaData();
181+
182+
assertEquals(0, dir.getSyncCount());
183+
assertEquals(0, dir.getMetaSyncCount());
184+
185+
delayedDir.close();
186+
187+
// The syncs should be executed on close
188+
assertEquals(1, dir.getSyncCount());
189+
assertEquals(1, dir.getMetaSyncCount());
190+
assertEquals(1, dir.getCloseCount());
191+
}
192+
193+
@Test
194+
public void testCloseOnIndexShutDown() throws IOException {
195+
final var dir = new TrackingDirectory(new RAMDirectory());
196+
final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500);
197+
final var index = new LuceneIndex(delayedDir, new StandardAnalyzer());
198+
assertEquals(0, dir.getCloseCount());
199+
200+
index.shutDown();
201+
202+
assertEquals(1, dir.getCloseCount());
203+
}
204+
205+
private void waitFor(long millis) {
206+
try {
207+
Thread.sleep(millis);
208+
} catch (InterruptedException e) {
209+
Thread.currentThread().interrupt();
210+
}
211+
}
212+
}

0 commit comments

Comments
 (0)