Skip to content

Commit d281ca3

Browse files
Merge pull request #2559 from quickwit-oss/trinity/sstable-partial-automaton
allow warming partially an sstable for an automaton
2 parents 71cf198 + be17daf commit d281ca3

File tree

15 files changed

+874
-54
lines changed

15 files changed

+874
-54
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-
6767
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
6868
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
6969
futures-util = { version = "0.3.28", optional = true }
70+
futures-channel = { version = "0.3.28", optional = true }
7071
fnv = "1.0.7"
7172

7273
[target.'cfg(windows)'.dependencies]
@@ -121,7 +122,7 @@ zstd-compression = ["zstd"]
121122
failpoints = ["fail", "fail/failpoints"]
122123
unstable = [] # useful for benches.
123124

124-
quickwit = ["sstable", "futures-util"]
125+
quickwit = ["sstable", "futures-util", "futures-channel"]
125126

126127
# Compares only the hash of a string when indexing data.
127128
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.

src/index/inverted_index_reader.rs

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ use std::io;
33
use common::json_path_writer::JSON_END_OF_PATH;
44
use common::BinarySerializable;
55
use fnv::FnvHashSet;
6+
#[cfg(feature = "quickwit")]
7+
use futures_util::{FutureExt, StreamExt, TryStreamExt};
8+
#[cfg(feature = "quickwit")]
9+
use itertools::Itertools;
10+
#[cfg(feature = "quickwit")]
11+
use tantivy_fst::automaton::{AlwaysMatch, Automaton};
612

713
use crate::directory::FileSlice;
814
use crate::positions::PositionReader;
@@ -219,13 +225,18 @@ impl InvertedIndexReader {
219225
self.termdict.get_async(term.serialized_value_bytes()).await
220226
}
221227

222-
async fn get_term_range_async(
223-
&self,
228+
async fn get_term_range_async<'a, A: Automaton + 'a>(
229+
&'a self,
224230
terms: impl std::ops::RangeBounds<Term>,
231+
automaton: A,
225232
limit: Option<u64>,
226-
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
233+
merge_holes_under_bytes: usize,
234+
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
235+
where
236+
A::State: Clone,
237+
{
227238
use std::ops::Bound;
228-
let range_builder = self.termdict.range();
239+
let range_builder = self.termdict.search(automaton);
229240
let range_builder = match terms.start_bound() {
230241
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
231242
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
@@ -242,7 +253,9 @@ impl InvertedIndexReader {
242253
range_builder
243254
};
244255

245-
let mut stream = range_builder.into_stream_async().await?;
256+
let mut stream = range_builder
257+
.into_stream_async_merging_holes(merge_holes_under_bytes)
258+
.await?;
246259

247260
let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
248261

@@ -288,7 +301,9 @@ impl InvertedIndexReader {
288301
limit: Option<u64>,
289302
with_positions: bool,
290303
) -> io::Result<bool> {
291-
let mut term_info = self.get_term_range_async(terms, limit).await?;
304+
let mut term_info = self
305+
.get_term_range_async(terms, AlwaysMatch, limit, 0)
306+
.await?;
292307

293308
let Some(first_terminfo) = term_info.next() else {
294309
// no key matches, nothing more to load
@@ -315,6 +330,84 @@ impl InvertedIndexReader {
315330
Ok(true)
316331
}
317332

333+
/// Warmup a block postings given a range of `Term`s.
334+
/// This method is for an advanced usage only.
335+
///
336+
/// returns a boolean, whether a term matching the range was found in the dictionary
337+
pub async fn warm_postings_automaton<
338+
A: Automaton + Clone + Send + 'static,
339+
E: FnOnce(Box<dyn FnOnce() -> io::Result<()> + Send>) -> F,
340+
F: std::future::Future<Output = io::Result<()>>,
341+
>(
342+
&self,
343+
automaton: A,
344+
// with_positions: bool, at the moment we have no use for it, and supporting it would add
345+
// complexity to the coalesce
346+
executor: E,
347+
) -> io::Result<bool>
348+
where
349+
A::State: Clone,
350+
{
351+
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
352+
// S3 (~80MiB/s, and 50ms latency)
353+
const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000;
354+
// we build a first iterator to download everything. Simply calling the function already
355+
// download everything we need from the sstable, but doesn't start iterating over it.
356+
let _term_info_iter = self
357+
.get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES)
358+
.await?;
359+
360+
let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded();
361+
let termdict = self.termdict.clone();
362+
let cpu_bound_task = move || {
363+
// then we build a 2nd iterator, this one with no holes, so we don't go through blocks
364+
// we can't match.
365+
// This makes the assumption there is a caching layer below us, which gives sync read
366+
// for free after the initial async access. This might not always be true, but is in
367+
// Quickwit.
368+
// We build things from this closure otherwise we get into lifetime issues that can only
369+
// be solved with self referential strucs. Returning an io::Result from here is a bit
370+
// more leaky abstraction-wise, but a lot better than the alternative
371+
let mut stream = termdict.search(automaton).into_stream()?;
372+
373+
// we could do without an iterator, but this allows us access to coalesce which simplify
374+
// things
375+
let posting_ranges_iter =
376+
std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone()));
377+
378+
let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| {
379+
if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start {
380+
Ok(range1.start..range2.end)
381+
} else {
382+
Err((range1, range2))
383+
}
384+
});
385+
386+
for posting_range in merged_posting_ranges_iter {
387+
if let Err(_) = sender.unbounded_send(posting_range) {
388+
// this should happen only when search is cancelled
389+
return Err(io::Error::other("failed to send posting range back"));
390+
}
391+
}
392+
Ok(())
393+
};
394+
let task_handle = executor(Box::new(cpu_bound_task));
395+
396+
let posting_downloader = posting_ranges_to_load_stream
397+
.map(|posting_slice| {
398+
self.postings_file_slice
399+
.read_bytes_slice_async(posting_slice)
400+
.map(|result| result.map(|_slice| ()))
401+
})
402+
.buffer_unordered(5)
403+
.try_collect::<Vec<()>>();
404+
405+
let (_, slices_downloaded) =
406+
futures_util::future::try_join(task_handle, posting_downloader).await?;
407+
408+
Ok(!slices_downloaded.is_empty())
409+
}
410+
318411
/// Warmup the block postings for all terms.
319412
/// This method is for an advanced usage only.
320413
///

src/termdict/fst_termdict/term_info_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ impl TermInfoBlockMeta {
9393
}
9494
}
9595

96+
#[derive(Clone)]
9697
pub struct TermInfoStore {
9798
num_terms: usize,
9899
block_meta_bytes: OwnedBytes,

src/termdict/fst_termdict/termdict.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::io::{self, Write};
2+
use std::sync::Arc;
23

34
use common::{BinarySerializable, CountingWriter};
45
use once_cell::sync::Lazy;
@@ -113,8 +114,9 @@ static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
113114
/// The `Fst` crate is used to associate terms to their
114115
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
115116
/// possible to fetch the associated `TermInfo`.
117+
#[derive(Clone)]
116118
pub struct TermDictionary {
117-
fst_index: tantivy_fst::Map<OwnedBytes>,
119+
fst_index: Arc<tantivy_fst::Map<OwnedBytes>>,
118120
term_info_store: TermInfoStore,
119121
}
120122

@@ -136,7 +138,7 @@ impl TermDictionary {
136138
let fst_index = open_fst_index(fst_file_slice)?;
137139
let term_info_store = TermInfoStore::open(values_file_slice)?;
138140
Ok(TermDictionary {
139-
fst_index,
141+
fst_index: Arc::new(fst_index),
140142
term_info_store,
141143
})
142144
}

src/termdict/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;
7474

7575
// TODO in the future this should become an enum of supported dictionaries
7676
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
77+
#[derive(Clone)]
7778
pub struct TermDictionary(InnerTermDict);
7879

7980
impl TermDictionary {

src/termdict/sstable_termdict/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;
2828
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;
2929

3030
/// SSTable used to store TermInfo objects.
31+
#[derive(Clone)]
3132
pub struct TermSSTable;
3233

3334
pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;

sstable/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ description = "sstables for tantivy"
1111

1212
[dependencies]
1313
common = {version= "0.7", path="../common", package="tantivy-common"}
14+
futures-util = "0.3.30"
15+
itertools = "0.13.0"
1416
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
1517
tantivy-fst = "0.5"
1618
# experimental gives us access to Decompressor::upper_bound

0 commit comments

Comments
 (0)