Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow warming partially an sstable for an automaton #2559

Merged
merged 15 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"

[target.'cfg(windows)'.dependencies]
Expand Down Expand Up @@ -120,7 +121,7 @@ zstd-compression = ["zstd"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.

quickwit = ["sstable", "futures-util"]
quickwit = ["sstable", "futures-util", "futures-channel"]

# Compares only the hash of a string when indexing data.
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
Expand Down
77 changes: 50 additions & 27 deletions src/index/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl InvertedIndexReader {
terms: impl std::ops::RangeBounds<Term>,
automaton: A,
limit: Option<u64>,
merge_holes_under: usize,
merge_holes_under_bytes: usize,
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
where
A::State: Clone,
Expand All @@ -254,7 +254,7 @@ impl InvertedIndexReader {
};

let mut stream = range_builder
.into_stream_async_merging_holes(merge_holes_under)
.into_stream_async_merging_holes(merge_holes_under_bytes)
.await?;

let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
Expand Down Expand Up @@ -334,52 +334,75 @@ impl InvertedIndexReader {
/// This method is for an advanced usage only.
///
/// returns a boolean, whether a term matching the range was found in the dictionary
pub async fn warm_postings_automaton<A: Automaton + Clone>(
pub async fn warm_postings_automaton<
A: Automaton + Clone + Send + 'static,
E: FnOnce(Box<dyn FnOnce() -> io::Result<()> + Send>) -> F,
F: std::future::Future<Output = io::Result<()>>,
>(
&self,
automaton: A,
// with_positions: bool, at the moment we have no use for it, and supporting it would add
// complexity to the coalesce
executor: E,
) -> io::Result<bool>
where
A::State: Clone,
{
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
// S3 (~80MiB/s, and 50ms latency)
let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000;
const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000;
// we build a first iterator to download everything. Simply calling the function already
// loads everything, but doesn't start iterating over the sstable.
let mut _term_info = self
.get_term_range_async(.., automaton.clone(), None, merge_holes_under)
// download everything we need from the sstable, but doesn't start iterating over it.
let _term_info_iter = self
.get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES)
.await?;

// we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't
// match, and just download them to reduce our query count. This makes the assumption
// there is a caching layer below, which might not always be true, but is in Quickwit.
let term_info = self.get_term_range_async(.., automaton, None, 0).await?;

// TODO this operation is often cheap for "friendly" automatons, but can be very costly for
// "unfriendly" ones such as ".*a{50}" (very few terms if any match this pattern, but we
// can't know early). In this case, we decompress and iterate over the entire sstable, while
// still being in async context. Ideally we should spawn this on a threadpool.
let range_to_load = term_info
.map(|term_info| term_info.postings_range)
.coalesce(|range1, range2| {
if range1.end + merge_holes_under >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded();
let termdict = self.termdict.clone();
let cpu_bound_task = move || {
// then we build a 2nd iterator, this one with no holes, so we don't go through blocks
// we can't match.
// This makes the assumption there is a caching layer below us, which gives sync read
// for free after the initial async access. This might not always be true, but is in
// Quickwit.
// We build things from this closure otherwise we get into lifetime issues that can only
// be solved with self referential strucs. Returning an io::Result from here is a bit
// more leaky abstraction-wise, but a lot better than the alternative
let mut stream = termdict.search(automaton).into_stream()?;

// we could do without an iterator, but this allows us access to coalesce which simplify
// things
let posting_ranges_iter =
std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we break it into several statement.

For instance:

  • Create the iterator of postings ranges.
  • doe the coalesce thing after.

.coalesce(|range1, range2| {
if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
}
});

for posting_range in posting_ranges_iter {
if let Err(_) = sender.unbounded_send(posting_range) {
// this should happen only when search is cancelled
return Err(io::Error::other("failed to send posting range back"));
}
});
}
Ok(())
};
let task_handle = executor(Box::new(cpu_bound_task));

let slices_downloaded = futures_util::stream::iter(range_to_load)
let posting_downloader = posting_ranges_to_load_stream
.map(|posting_slice| {
self.postings_file_slice
.read_bytes_slice_async(posting_slice)
.map(|result| result.map(|_slice| ()))
})
.buffer_unordered(5)
.try_collect::<Vec<()>>()
.await?;
.try_collect::<Vec<()>>();

let (_, slices_downloaded) =
futures_util::future::try_join(task_handle, posting_downloader).await?;

Ok(!slices_downloaded.is_empty())
}
Expand Down
1 change: 1 addition & 0 deletions src/termdict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;

// TODO in the future this should become an enum of supported dictionaries
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
#[cfg_attr(feature = "quickwit", derive(Clone))]
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
pub struct TermDictionary(InnerTermDict);

impl TermDictionary {
Expand Down
1 change: 1 addition & 0 deletions src/termdict/sstable_termdict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;

/// SSTable used to store TermInfo objects.
#[derive(Clone)]
pub struct TermSSTable;

pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;
Expand Down
14 changes: 7 additions & 7 deletions sstable/src/block_match_automaton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ fn can_block_match_automaton_with_start(
// - keys are `abcd` and `abce` => we test for abc[d-e].*
// - keys are `abcd` and `abc` => contradiction with start_key < end_key.
//
// ideally for ]abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?)
// ideally for (abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?)
// but let's start simple (and correct), and tighten our bounds latter
//
// and for ]abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?)
// and for (abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?)
// abc (
// d(e.+|[f-\xff].*) |
// e.* |
Expand All @@ -69,8 +69,8 @@ fn can_block_match_automaton_with_start(
// - ? is a the thing before can_match(), or current state.is_match()
// - | means test both side

// we have two cases, either start_key is a prefix of end_key (e.g. ]abc, abcjp]),
// or it is not (e.g. ]abcdg, abcjp]). It is not possible however that end_key be a prefix of
// we have two cases, either start_key is a prefix of end_key (e.g. (abc, abcjp]),
// or it is not (e.g. (abcdg, abcjp]). It is not possible however that end_key be a prefix of
// start_key (or that both are equal) because we already handled start_key >= end_key.
//
// if we are in the first case, we want to visit the following states:
Expand Down Expand Up @@ -103,7 +103,7 @@ fn can_block_match_automaton_with_start(

// things starting with start_range were handled in match_range_start
// this starting with end_range are handled bellow.
// this can run for 0 iteration in cases such as ]abc, abd]
// this can run for 0 iteration in cases such as (abc, abd]
for rb in (start_range + 1)..end_range {
let new_state = automaton.accept(&base_state, rb);
if automaton.can_match(&new_state) {
Expand Down Expand Up @@ -132,7 +132,7 @@ fn match_range_start<S, A: Automaton<State = S>>(
automaton: &A,
mut state: S,
) -> bool {
// case ]abcdgj, abcpqr], `abcd` is already consumed, we need to handle:
// case (abcdgj, abcpqr], `abcd` is already consumed, we need to handle:
// - [h-\xff].*
// - g[k-\xff].*
// - gj.+ == gf[\0-\xff].*
Expand Down Expand Up @@ -177,7 +177,7 @@ fn match_range_end<S, A: Automaton<State = S>>(
automaton: &A,
mut state: S,
) -> bool {
// for ]abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere,
// for (abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere,
// we just need to handle
// - [\0-o].*
// - p
Expand Down
8 changes: 4 additions & 4 deletions sstable/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
merge_holes_under: usize,
merge_holes_under_bytes: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
Expand All @@ -112,7 +112,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton(
key_range,
automaton,
merge_holes_under,
merge_holes_under_bytes,
));
let data = blocks
.map(|block_addr| {
Expand Down Expand Up @@ -242,7 +242,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&'a self,
key_range: impl RangeBounds<[u8]>,
automaton: &'a impl Automaton,
merge_holes_under: usize,
merge_holes_under_bytes: usize,
) -> impl Iterator<Item = BlockAddr> + 'a {
let lower_bound = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
Expand All @@ -263,7 +263,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
.filter(move |(block_id, _)| block_range.contains(block_id))
.map(|(_, block_addr)| block_addr)
.coalesce(move |first, second| {
if first.byte_range.end + merge_holes_under >= second.byte_range.start {
if first.byte_range.end + merge_holes_under_bytes >= second.byte_range.start {
Ok(BlockAddr {
first_ordinal: first.first_ordinal,
byte_range: first.byte_range.start..second.byte_range.end,
Expand Down
10 changes: 5 additions & 5 deletions sstable/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where

async fn delta_reader_async(
&self,
merge_holes_under: usize,
merge_holes_under_bytes: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let key_range = (
bound_as_byte_slice(&self.lower),
Expand All @@ -102,7 +102,7 @@ where
key_range,
self.limit,
&self.automaton,
merge_holes_under,
merge_holes_under_bytes,
)
.await
}
Expand Down Expand Up @@ -142,12 +142,12 @@ where
}

/// Same as `into_stream_async`, but tries to issue a single io operation when requesting
/// blocks that are not consecutive, but also less than `merge_holes_under` bytes appart.
/// blocks that are not consecutive, but also less than `merge_holes_under_bytes` bytes appart.
pub async fn into_stream_async_merging_holes(
self,
merge_holes_under: usize,
merge_holes_under_bytes: usize,
) -> io::Result<Streamer<'a, TSSTable, A>> {
let delta_reader = self.delta_reader_async(merge_holes_under).await?;
let delta_reader = self.delta_reader_async(merge_holes_under_bytes).await?;
self.into_stream_given_delta_reader(delta_reader)
}

Expand Down
Loading