Skip to content

Commit 2c90069

Browse files
committed
introducign asymmetric reader/writer
1 parent d33cfa5 commit 2c90069

File tree

3 files changed

+151
-206
lines changed

3 files changed

+151
-206
lines changed

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,6 @@ pub(crate) struct FileBackedIndex {
6666
delete_tasks: Vec<DeleteTask>,
6767
/// Stamper.
6868
stamper: Stamper,
69-
/// Flag used to avoid polling the metastore if
70-
/// the process is actually writing the metastore.
71-
///
72-
/// The logic is "soft". We avoid the polling step
73-
/// if the metastore wrote some value since the last
74-
/// polling loop.
75-
recently_modified: bool,
76-
7769
}
7870

7971
#[cfg(any(test, feature = "testsuite"))]
@@ -150,7 +142,6 @@ impl From<IndexMetadata> for FileBackedIndex {
150142
per_source_shards,
151143
delete_tasks: Default::default(),
152144
stamper: Default::default(),
153-
recently_modified: false,
154145
}
155146
}
156147
}
@@ -185,20 +176,9 @@ impl FileBackedIndex {
185176
per_source_shards,
186177
delete_tasks,
187178
stamper: Stamper::new(last_opstamp),
188-
recently_modified: false,
189179
}
190180
}
191181

192-
/// Sets the `recently_modified` flag to false and returns the previous value.
193-
pub fn flip_recently_modified_down(&mut self) -> bool {
194-
std::mem::replace(&mut self.recently_modified, false)
195-
}
196-
197-
/// Marks the file as `recently_modified`.
198-
pub fn set_recently_modified(&mut self) {
199-
self.recently_modified = true;
200-
}
201-
202182
/// Index ID accessor.
203183
pub fn index_id(&self) -> &str {
204184
self.metadata.index_id()

quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,19 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult};
2424
use quickwit_proto::types::IndexId;
2525
use quickwit_storage::Storage;
2626
use tokio::sync::{Mutex, OnceCell};
27-
use tokio::time::Instant;
2827
use tracing::error;
2928

3029
use super::file_backed_index::FileBackedIndex;
3130
use super::store_operations::{load_index, METASTORE_FILE_NAME};
32-
use super::FileBackedIndexCell;
31+
use super::{FileBackedIndexCell, FileBackedIndexWriter};
3332

3433
/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
3534
/// loaded, it optionally spawns a task to periodically poll the storage and update the index.
3635
pub(crate) struct LazyFileBackedIndex {
3736
index_id: IndexId,
3837
storage: Arc<dyn Storage>,
3938
polling_interval_opt: Option<Duration>,
40-
lazy_index: OnceCell<Arc<Mutex<FileBackedIndexCell>>>,
39+
lazy_index: OnceCell<FileBackedIndexCell>,
4140
}
4241

4342
impl LazyFileBackedIndex {
@@ -49,15 +48,15 @@ impl LazyFileBackedIndex {
4948
file_backed_index: Option<FileBackedIndex>,
5049
) -> Self {
5150
let index_mutex_opt =
52-
file_backed_index.map(|index| Arc::new(Mutex::new(FileBackedIndexCell::new(index))));
51+
file_backed_index.map(|index| FileBackedIndexCell::new(index));
5352
// If the polling interval is configured and the index is already loaded,
5453
// spawn immediately the polling task
5554
if let Some(index_mutex) = &index_mutex_opt {
5655
if let Some(polling_interval) = polling_interval_opt {
5756
spawn_index_metadata_polling_task(
5857
storage.clone(),
5958
index_id.clone(),
60-
Arc::downgrade(index_mutex),
59+
Arc::downgrade(&index_mutex.writer),
6160
polling_interval,
6261
);
6362
}
@@ -72,23 +71,23 @@ impl LazyFileBackedIndex {
7271

7372
/// Gets a synchronized `FileBackedIndex`. If the index wasn't provided on creation, we load it
7473
/// lazily on the first call of this method.
75-
pub async fn get(&self) -> MetastoreResult<Arc<Mutex<FileBackedIndexCell>>> {
74+
pub(crate) async fn get(&self) -> MetastoreResult<FileBackedIndexCell> {
7675
self.lazy_index
7776
.get_or_try_init(|| async move {
7877
let index = load_index(&*self.storage, &self.index_id).await?;
7978
let file_backed_index_cell = FileBackedIndexCell::new(index);
80-
let index_mutex = Arc::new(Mutex::new(file_backed_index_cell));
79+
let file_backed_index_writer = Arc::downgrade(&file_backed_index_cell.writer);
8180
// When the index is loaded lazily, the polling task is not started in the
8281
// constructor so we do it here when the index is actually loaded.
8382
if let Some(polling_interval) = self.polling_interval_opt {
8483
spawn_index_metadata_polling_task(
8584
self.storage.clone(),
8685
self.index_id.clone(),
87-
Arc::downgrade(&index_mutex),
86+
file_backed_index_writer,
8887
polling_interval,
8988
);
9089
}
91-
Ok(index_mutex)
90+
Ok(file_backed_index_cell)
9291
})
9392
.await
9493
.cloned()
@@ -98,20 +97,21 @@ impl LazyFileBackedIndex {
9897
async fn poll_index_metadata_once(
9998
storage: &dyn Storage,
10099
index_id: &str,
101-
index_mutex: &Mutex<FileBackedIndexCell>,
100+
index_writer: &Mutex<FileBackedIndexWriter>,
102101
) {
103-
todo!();
104-
// FIXME
105-
/*
106-
let mut locked_index = index_mutex.lock().await;
107-
if locked_index.flip_recently_modified_down() {
102+
let mut locked_index = index_writer.lock().await;
103+
if locked_index.upload_task.is_none() {
104+
return;
105+
}
106+
if locked_index.last_push.elapsed() < Duration::from_secs(30) {
108107
return;
109108
}
110109
let load_index_result = load_index(storage, index_id).await;
111110

112111
match load_index_result {
113112
Ok(index) => {
114-
*locked_index = index;
113+
locked_index.write_state = index;
114+
locked_index.publish();
115115
}
116116
Err(MetastoreError::NotFound(EntityKind::Index { .. })) => {
117117
// The index has been deleted by the file-backed metastore holding a reference to this
@@ -127,23 +127,22 @@ async fn poll_index_metadata_once(
127127
);
128128
}
129129
}
130-
*/
131130
}
132131

133132
fn spawn_index_metadata_polling_task(
134133
storage: Arc<dyn Storage>,
135134
index_id: IndexId,
136-
metastore_weak: Weak<Mutex<FileBackedIndexCell>>,
135+
metastore_weak: Weak<Mutex<FileBackedIndexWriter>>,
137136
polling_interval: Duration,
138137
) {
139138
tokio::task::spawn(async move {
140139
let mut interval = tokio::time::interval(polling_interval);
141140
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
142141
interval.tick().await; //< this is to prevent fetch right after the first population of the data.
143142

144-
while let Some(metadata_mutex) = metastore_weak.upgrade() {
143+
while let Some(metadata_writer) = metastore_weak.upgrade() {
145144
interval.tick().await;
146-
poll_index_metadata_once(&*storage, &index_id, &metadata_mutex).await;
145+
poll_index_metadata_once(&*storage, &index_id, &*metadata_writer).await;
147146
}
148147
});
149148
}

0 commit comments

Comments
 (0)