From ce49d738a625e08c67d5aefac36b8fa6116e3d60 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Mon, 6 Jan 2025 17:57:25 +0000 Subject: [PATCH] feat: use SnapshotOp --- Cargo.lock | 1 + influxdb3_cache/src/last_cache/provider.rs | 2 + influxdb3_wal/Cargo.toml | 3 + influxdb3_wal/src/create.rs | 6 +- influxdb3_wal/src/lib.rs | 114 +++-- influxdb3_wal/src/object_store.rs | 449 +++++++++++------- influxdb3_wal/src/serialize.rs | 1 - influxdb3_wal/src/snapshot_tracker.rs | 105 ++-- influxdb3_write/src/write_buffer/mod.rs | 19 +- influxdb3_write/src/write_buffer/plugins.rs | 2 + .../src/write_buffer/queryable_buffer.rs | 85 ++-- 11 files changed, 460 insertions(+), 327 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0569cccd780..5b36622bbd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3183,6 +3183,7 @@ dependencies = [ "schema", "serde", "serde_with", + "test-log", "thiserror 1.0.69", "tokio", ] diff --git a/influxdb3_cache/src/last_cache/provider.rs b/influxdb3_cache/src/last_cache/provider.rs index 57b74c5fbda..1186efde8d5 100644 --- a/influxdb3_cache/src/last_cache/provider.rs +++ b/influxdb3_cache/src/last_cache/provider.rs @@ -319,6 +319,8 @@ impl LastCacheProvider { } } WalOp::Catalog(_) => (), + WalOp::ForcedSnapshot(_) => (), + WalOp::Snapshot(_) => (), } } } diff --git a/influxdb3_wal/Cargo.toml b/influxdb3_wal/Cargo.toml index 3d46b44efdb..5691901a05b 100644 --- a/influxdb3_wal/Cargo.toml +++ b/influxdb3_wal/Cargo.toml @@ -34,3 +34,6 @@ tokio.workspace = true [lints] workspace = true + +[dev-dependencies] +test-log.workspace = true diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs index 3521f53d152..defdee06a96 100644 --- a/influxdb3_wal/src/create.rs +++ b/influxdb3_wal/src/create.rs @@ -18,7 +18,6 @@ pub fn wal_contents( max_timestamp_ns, wal_file_number: WalFileSequenceNumber::new(wal_file_number), ops: ops.into_iter().collect(), - snapshot: None, } } @@ -29,13 +28,14 @@ pub fn wal_contents_with_snapshot( ops: impl IntoIterator, snapshot: SnapshotDetails, ) -> WalContents { + let mut wal_ops: Vec = ops.into_iter().collect(); + wal_ops.push(WalOp::Snapshot(snapshot)); WalContents { persist_timestamp_ms: 0, min_timestamp_ns, max_timestamp_ns, wal_file_number: WalFileSequenceNumber::new(wal_file_number), - ops: ops.into_iter().collect(), - snapshot: Some(snapshot), + ops: wal_ops, } } diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index d190cb969e6..8bf5b04fae5 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -8,7 +8,6 @@ pub mod object_store; pub mod serialize; mod snapshot_tracker; -use crate::snapshot_tracker::SnapshotInfo; use async_trait::async_trait; use data_types::Timestamp; use hashbrown::HashMap; @@ -77,16 +76,25 @@ pub trait Wal: Debug + Send + Sync + 'static { /// permit to release when done. The caller is responsible for cleaning up the wal. async fn flush_buffer( &self, - ) -> Option<( - oneshot::Receiver, - SnapshotInfo, - OwnedSemaphorePermit, - )>; + ) -> Option<(oneshot::Receiver, OwnedSemaphorePermit)>; + + /// This is similar to flush buffer but it allows for snapshot to be done immediately rather + /// than waiting for query buffer to get to a certain capacity. It would be nicer to not + /// require an external interface to force flush the buffer however to decide whether to + /// snapshot immediately is based on query buffer's capacity which is not visible to snapshot + /// tracker which usually decides whether to snapshot or not. To bubble up the query buffer's + /// size or even to indicate that buffer is full we need a shared state and because flushing + /// the buffer is in hot path, this additional method acts as a compromise as this can be + /// called by making the decision to force the snapshot externally to `WalObjectStore` (that + /// implements this trait) + async fn force_flush_buffer( + &self, + ) -> Option<(oneshot::Receiver, OwnedSemaphorePermit)>; /// Removes any snapshot wal files async fn cleanup_snapshot( &self, - snapshot_details: SnapshotInfo, + snapshot_details: SnapshotDetails, snapshot_permit: OwnedSemaphorePermit, ); @@ -96,37 +104,29 @@ pub trait Wal: Debug + Send + Sync + 'static { /// Returns the last persisted wal file sequence number async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber; - /// Returns the snapshot info, if force snapshot is set it avoids checking - /// certain cases and returns snapshot info leaving only the last wal period - async fn snapshot_info_and_permit( - &self, - force_snapshot: bool, - ) -> Option<(SnapshotInfo, OwnedSemaphorePermit)>; + /// Get snapshot details based on conditions + async fn get_snapshot_details(&self, force_snapshot: bool) -> Option; /// Stop all writes to the WAL and flush the buffer to a WAL file. async fn shutdown(&self); async fn flush_buffer_and_cleanup_snapshot(self: Arc) { let maybe_snapshot = self.flush_buffer().await; - if let Some((snapshot_complete, snapshot_info, permit)) = maybe_snapshot { - self.cleanup_after_snapshot(snapshot_complete, snapshot_info, permit) - .await; + if let Some((snapshot_complete, permit)) = maybe_snapshot { + self.cleanup_after_snapshot(snapshot_complete, permit).await; } } async fn cleanup_after_snapshot( self: Arc, snapshot_complete: oneshot::Receiver, - snapshot_info: SnapshotInfo, permit: OwnedSemaphorePermit, ) { // handle snapshot cleanup outside of the flush loop let arcd_wal = Arc::clone(&self); tokio::spawn(async move { let snapshot_details = snapshot_complete.await.expect("snapshot failed"); - assert_eq!(snapshot_info.snapshot_details, snapshot_details); - - arcd_wal.cleanup_snapshot(snapshot_info, permit).await; + arcd_wal.cleanup_snapshot(snapshot_details, permit).await; }); } } @@ -136,22 +136,26 @@ pub trait Wal: Debug + Send + Sync + 'static { #[async_trait] pub trait WalFileNotifier: Debug + Send + Sync + 'static { /// Notify the handler that a new WAL file has been persisted with the given contents. - fn notify(&self, write: WalContents); - - /// Notify the handler that a new WAL file has been persisted with the given contents and tell - /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete. - async fn notify_and_snapshot( + async fn notify( &self, write: WalContents, - snapshot_details: SnapshotDetails, - ) -> oneshot::Receiver; - - /// Snapshot only, currently used to force the snapshot - async fn snapshot( - &self, - snapshot_details: SnapshotDetails, - ) -> oneshot::Receiver; - + do_snapshot: bool, + ) -> Option>; + + // /// Notify the handler that a new WAL file has been persisted with the given contents and tell + // /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete. + // async fn notify_and_snapshot( + // &self, + // write: WalContents, + // snapshot_details: SnapshotDetails, + // ) -> oneshot::Receiver; + + // /// Snapshot only, currently used to force the snapshot + // async fn snapshot( + // &self, + // snapshot_details: SnapshotDetails, + // ) -> oneshot::Receiver; + // fn as_any(&self) -> &dyn Any; } @@ -251,6 +255,8 @@ impl Default for Gen1Duration { pub enum WalOp { Write(WriteBatch), Catalog(OrderedCatalogBatch), + ForcedSnapshot(SnapshotDetails), + Snapshot(SnapshotDetails), } impl PartialOrd for WalOp { @@ -273,6 +279,18 @@ impl Ord for WalOp { // For two Write ops, consider them equal (WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal, + (WalOp::Write(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal, + (WalOp::Write(_), WalOp::Snapshot(_)) => Ordering::Equal, + (WalOp::Catalog(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal, + (WalOp::Catalog(_), WalOp::Snapshot(_)) => Ordering::Equal, + (WalOp::ForcedSnapshot(_), WalOp::Write(_)) => Ordering::Equal, + (WalOp::ForcedSnapshot(_), WalOp::Catalog(_)) => Ordering::Equal, + (WalOp::ForcedSnapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal, + (WalOp::ForcedSnapshot(_), WalOp::Snapshot(_)) => Ordering::Equal, + (WalOp::Snapshot(_), WalOp::Write(_)) => Ordering::Equal, + (WalOp::Snapshot(_), WalOp::Catalog(_)) => Ordering::Equal, + (WalOp::Snapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal, + (WalOp::Snapshot(_), WalOp::Snapshot(_)) => Ordering::Equal, } } } @@ -282,6 +300,8 @@ impl WalOp { match self { WalOp::Write(w) => Some(w), WalOp::Catalog(_) => None, + WalOp::ForcedSnapshot(_) => None, + WalOp::Snapshot(_) => None, } } @@ -289,6 +309,8 @@ impl WalOp { match self { WalOp::Write(_) => None, WalOp::Catalog(c) => Some(&c.catalog), + WalOp::ForcedSnapshot(_) => None, + WalOp::Snapshot(_) => None, } } } @@ -837,13 +859,29 @@ pub struct WalContents { pub wal_file_number: WalFileSequenceNumber, /// The operations contained in the WAL file pub ops: Vec, - /// If present, the buffer should be snapshot after the contents of this file are loaded. - pub snapshot: Option, } impl WalContents { pub fn is_empty(&self) -> bool { - self.ops.is_empty() && self.snapshot.is_none() + self.ops.is_empty() + } + + pub fn add_snapshot_op(&mut self, snapshot_details: SnapshotDetails) { + self.ops.push(WalOp::Snapshot(snapshot_details)); + } + + pub fn add_force_snapshot_op(&mut self, snapshot_details: SnapshotDetails) { + self.ops.push(WalOp::ForcedSnapshot(snapshot_details)); + } + + pub fn find_snapshot_details(&self) -> Option { + // There should be only one snapshot in a wal file? + // should assert that + self.ops.iter().find_map(|item| match item { + WalOp::Snapshot(details) => Some(*details), + WalOp::ForcedSnapshot(details) => Some(*details), + _ => None, + }) } } @@ -912,6 +950,8 @@ pub struct SnapshotDetails { pub snapshot_sequence_number: SnapshotSequenceNumber, /// All chunks with data before this time can be snapshot and persisted pub end_time_marker: i64, + /// All wal files with a sequence number >= to this can be deleted once snapshotting is complete + pub first_wal_sequence_number: WalFileSequenceNumber, /// All wal files with a sequence number <= to this can be deleted once snapshotting is complete pub last_wal_sequence_number: WalFileSequenceNumber, } diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 8b7cda2df86..e08929ae112 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -1,10 +1,11 @@ use crate::serialize::verify_file_type_and_deserialize; -use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod}; +use crate::snapshot_tracker::{SnapshotTracker, WalPeriod}; use crate::{ background_wal_flush, OrderedCatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig, WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch, }; use bytes::Bytes; +use core::panic; use data_types::Timestamp; use futures_util::stream::StreamExt; use hashbrown::HashMap; @@ -81,6 +82,7 @@ impl WalObjectStore { database_to_write_batch: Default::default(), catalog_batches: vec![], write_op_responses: vec![], + snapshot_op: None, }, SnapshotTracker::new( config.snapshot_size, @@ -94,15 +96,24 @@ impl WalObjectStore { /// Loads the WAL files in order from object store, calling the file notifier on each one and /// populating the snapshot tracker with the WAL periods. pub async fn replay(&self) -> crate::Result<()> { + let (last_wal_sequence_number, last_snapshot_sequence_number) = { + let snapshot_tracker = &self.flush_buffer.lock().await.snapshot_tracker; + ( + snapshot_tracker.last_wal_sequence_number(), + snapshot_tracker.last_snapshot_sequence_number(), + ) + }; + + debug!( + ?last_wal_sequence_number, + ?last_snapshot_sequence_number, + "last wal and snapshot seq num" + ); + + // should we load only paths after last_wal_sequence_number? let paths = self.load_existing_wal_file_paths().await?; - let last_snapshot_sequence_number = { - self.flush_buffer - .lock() - .await - .snapshot_tracker - .last_snapshot_sequence_number() - }; + debug!(num_paths = ?paths.len(), ">>> wal paths"); async fn get_contents( object_store: Arc, @@ -121,49 +132,61 @@ impl WalObjectStore { for wal_contents in replay_tasks { let wal_contents = wal_contents.await??; - // add this to the snapshot tracker, so we know what to clear out later if the replay - // was a wal file that had a snapshot - self.flush_buffer - .lock() - .await - .replay_wal_period(WalPeriod::new( - wal_contents.wal_file_number, - Timestamp::new(wal_contents.min_timestamp_ns), - Timestamp::new(wal_contents.max_timestamp_ns), - )); - - match wal_contents.snapshot { - // This branch uses so much time - None => self.file_notifier.notify(wal_contents), - Some(snapshot_details) => { - let snapshot_info = { - let mut buffer = self.flush_buffer.lock().await; - - match buffer.snapshot_tracker.snapshot(false) { - None => None, - Some(info) => { - let semaphore = Arc::clone(&buffer.snapshot_semaphore); - let permit = semaphore.acquire_owned().await.unwrap(); - Some((info, permit)) - } + if wal_contents.wal_file_number > last_wal_sequence_number { + debug!( + ?wal_contents, + ">>> inside replay trying to load wal contents" + ); + { + // add this to the snapshot tracker, so we know what to clear out later if the replay + // was a wal file that had a snapshot + self.flush_buffer + .lock() + .await + .replay_wal_period(WalPeriod::new( + wal_contents.wal_file_number, + Timestamp::new(wal_contents.min_timestamp_ns), + Timestamp::new(wal_contents.max_timestamp_ns), + )); + } + { + // this will snapshot - if snapshot op is present. if we construct the + // semaphore outside then it's constructed needlessly but that's what we + // do for now + let buffer = self.flush_buffer.lock().await; + let mut maybe_permit_and_details = None; + let mut do_snapshot = false; + + if let Some(details) = wal_contents.find_snapshot_details() { + if details.snapshot_sequence_number > last_snapshot_sequence_number { + let permit = buffer.acquire_snapshot_permit().await; + maybe_permit_and_details = Some((permit, details)); + do_snapshot = true; } - }; - if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number { - // Instead just notify about the WAL, as this snapshot has already been taken - // and WAL files may have been cleared. - self.file_notifier.notify(wal_contents); - } else { - let snapshot_done = self - .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) - .await; - let details = snapshot_done.await.unwrap(); - assert_eq!(snapshot_details, details); } - // if the info is there, we have wal files to delete - if let Some((snapshot_info, snapshot_permit)) = snapshot_info { - self.cleanup_snapshot(snapshot_info, snapshot_permit).await; + // if maybe_permit_and_details.is_none() { + // // Not creating a wal op to indicate we're snapshotting, as we're already + // // in the replay method anyway. We cannot create wal files within here + // if let Some(snapshot_info) = buffer.snapshot_tracker.snapshot(false) { + // let permit = buffer.acquire_snapshot_permit().await; + // maybe_permit_and_details = + // Some((permit, snapshot_info.snapshot_details)); + // } + // } + + // shouldn't have snapshot receiver if permit/details are none + let maybe_snapshot_receiver = + self.file_notifier.notify(wal_contents, do_snapshot).await; + if let Some(receiver) = maybe_snapshot_receiver { + let _snapshot_details = receiver.await.unwrap(); + } + + if let Some((permit, details)) = maybe_permit_and_details { + // details here will either match what came through wal_contents in the + // file or from snapshot call. So it's safe to use this detail to remove + // any wal files that have been snapshotted + self.cleanup_snapshot(details, permit).await; } } } @@ -178,10 +201,10 @@ impl WalObjectStore { self.flush_buffer.lock().await.wal_buffer.is_shutdown = true; // do the flush and wait for the snapshot if that's running - if let Some((snapshot_done, snapshot_info, snapshot_permit)) = self.flush_buffer().await { + if let Some((snapshot_done, snapshot_permit)) = self.flush_buffer(false).await { let snapshot_details = snapshot_done.await.expect("snapshot should complete"); - assert_eq!(snapshot_info.snapshot_details, snapshot_details); - self.remove_snapshot_wal_files(snapshot_info, snapshot_permit) + // assert_eq!(snapshot_info.snapshot_details, snapshot_details); + self.remove_snapshot_wal_files(snapshot_details, snapshot_permit) .await; } } @@ -216,26 +239,20 @@ impl WalObjectStore { async fn flush_buffer( &self, - ) -> Option<( - oneshot::Receiver, - SnapshotInfo, - OwnedSemaphorePermit, - )> { - let (wal_contents, responses, snapshot) = { + force_snapshot: bool, + ) -> Option<(oneshot::Receiver, OwnedSemaphorePermit)> { + let (wal_contents, responses, semaphore) = { let mut flush_buffer = self.flush_buffer.lock().await; - if flush_buffer.wal_buffer.is_empty() { - return None; - } + flush_buffer - .flush_buffer_into_contents_and_responses() - .await + .flush_buffer_into_contents_and_responses(force_snapshot) + .await? }; info!( n_ops = %wal_contents.ops.len(), min_timestamp_ns = %wal_contents.min_timestamp_ns, max_timestamp_ns = %wal_contents.max_timestamp_ns, wal_file_number = %wal_contents.wal_file_number, - snapshot_details = ?wal_contents.snapshot, "flushing WAL buffer to object store" ); @@ -280,36 +297,48 @@ impl WalObjectStore { } } - debug!(snapshot_info = ?wal_contents.snapshot, ">>> snapshot info"); + debug!(?wal_contents, ">>> WAL contents for snapshotting"); // now that we've persisted this latest notify and start the snapshot, if set - let snapshot_response = match wal_contents.snapshot { - Some(snapshot_details) => { - info!(?snapshot_details, "snapshotting wal"); - let snapshot_done = self - .file_notifier - .notify_and_snapshot(wal_contents, snapshot_details) - .await; - let (snapshot_info, snapshot_permit) = - snapshot.expect("snapshot should be set when snapshot details are set"); - Some((snapshot_done, snapshot_info, snapshot_permit)) - } - None => { - debug!( - "notify sent to buffer for wal file {}", - wal_contents.wal_file_number.as_u64() - ); - self.file_notifier.notify(wal_contents); - None - } - }; + let snapshot_done = self.file_notifier.notify(wal_contents, true).await; + // let snapshot_response = match wal_contents.snapshot { + // Some(snapshot_details) => { + // info!(?snapshot_details, "snapshotting wal"); + // let snapshot_done = self + // .file_notifier + // .notify_and_snapshot(wal_contents, snapshot_details) + // .await; + // let (snapshot_info, snapshot_permit) = + // snapshot.expect("snapshot should be set when snapshot details are set"); + // Some((snapshot_done, snapshot_info, snapshot_permit)) + // } + // None => { + // debug!( + // "notify sent to buffer for wal file {}", + // wal_contents.wal_file_number.as_u64() + // ); + // self.file_notifier.notify(wal_contents); + // None + // } + // }; // send all the responses back to clients for response in responses { let _ = response.send(WriteResult::Success(())); } - snapshot_response + match (snapshot_done, semaphore) { + (None, None) => None, + (None, Some(_)) => panic!("snapshot permit present but no snapshot"), + (Some(_), None) => panic!("snapshot done without permit"), + (Some(receiver), Some(permit)) => Some((receiver, permit)), + } + // match (snapshot_done, semaphore) { + // (None, None) => None, + // (None, Some(_)) => panic!("snapshot permit present but no snapshot"), + // (Some(_), None) => panic!("snapshot done without permit"), + // (Some(receiver), Some((info, permit))) => Some((receiver, info, permit)), + // } } async fn load_existing_wal_file_paths(&self) -> crate::Result> { @@ -342,15 +371,26 @@ impl WalObjectStore { async fn remove_snapshot_wal_files( &self, - snapshot_info: SnapshotInfo, + snapshot_details: SnapshotDetails, snapshot_permit: OwnedSemaphorePermit, ) { - for period in snapshot_info.wal_periods { - let path = wal_path(&self.host_identifier_prefix, period.wal_file_number); + let start = snapshot_details.first_wal_sequence_number.as_u64(); + let end = snapshot_details.last_wal_sequence_number.as_u64(); + for period in start..=end { + let path = wal_path( + &self.host_identifier_prefix, + WalFileSequenceNumber::new(period), + ); + debug!(?path, ">>> deleting wal file"); loop { match self.object_store.delete(&path).await { Ok(_) => break, + Err(object_store::Error::NotFound { path, source }) => { + // If we attempt to remove a file and file does not exist it is ok to move + // on. This can happen if wal files sequence is missing a file inbetween + debug!(%path, %source, "cannot find path to delete wal file"); + } Err(object_store::Error::Generic { store, source }) => { error!(%store, %source, "error deleting wal file"); // hopefully just a temporary error, keep trying until we succeed @@ -383,17 +423,19 @@ impl Wal for WalObjectStore { async fn flush_buffer( &self, - ) -> Option<( - oneshot::Receiver, - SnapshotInfo, - OwnedSemaphorePermit, - )> { - self.flush_buffer().await + ) -> Option<(oneshot::Receiver, OwnedSemaphorePermit)> { + self.flush_buffer(false).await + } + + async fn force_flush_buffer( + &self, + ) -> Option<(oneshot::Receiver, OwnedSemaphorePermit)> { + self.flush_buffer(true).await } async fn cleanup_snapshot( &self, - snapshot_info: SnapshotInfo, + snapshot_info: SnapshotDetails, snapshot_permit: OwnedSemaphorePermit, ) { self.remove_snapshot_wal_files(snapshot_info, snapshot_permit) @@ -416,12 +458,9 @@ impl Wal for WalObjectStore { .last_snapshot_sequence_number() } - async fn snapshot_info_and_permit( - &self, - force_snapshot: bool, - ) -> Option<(SnapshotInfo, OwnedSemaphorePermit)> { + async fn get_snapshot_details(&self, force_snapshot: bool) -> Option { let mut buff = self.flush_buffer.lock().await; - buff.get_snapshot_info_and_permit(force_snapshot).await + buff.get_snapshot_details(force_snapshot) } async fn shutdown(&self) { @@ -456,48 +495,80 @@ impl FlushBuffer { self.snapshot_tracker.add_wal_period(wal_period); } - async fn get_snapshot_info_and_permit( - &mut self, - force_snapshot: bool, - ) -> Option<(SnapshotInfo, OwnedSemaphorePermit)> { - let maybe_snapshot = self.snapshot_tracker.snapshot(force_snapshot); - - match maybe_snapshot { - Some(snapshot_info) => { - debug!(?snapshot_info, ">>> snapshot info"); - - Some((snapshot_info, self.acquire_snapshot_permit().await)) - } - None => None, - } + fn get_snapshot_details(&mut self, force_snapshot: bool) -> Option { + self.snapshot_tracker.snapshot(force_snapshot) } /// Converts the wal_buffer into contents and resets it. Returns the channels waiting for /// responses. If a snapshot should occur with this flush, a semaphore permit is also returned. async fn flush_buffer_into_contents_and_responses( &mut self, - ) -> ( + force_snapshot: bool, + ) -> Option<( WalContents, Vec>, - Option<(SnapshotInfo, OwnedSemaphorePermit)>, - ) { + Option, + )> { + + // buffer empty, snapshot none => return None + // buffer present, snapshot none => no snapshot op in wal content semaphore + // buffer empty, snapshot present => Some(wal_contents with snapshot and semaphore - this + // happens only when forcing snapshot) + // buffer present and snapshot present => wal_content has snapshot op + // in all these cases we want snapshot to appear at the end + // convert into wal contents and responses and capture if a snapshot should be taken - let (mut wal_contents, responses) = self.flush_buffer_with_responses(); - self.snapshot_tracker.add_wal_period(WalPeriod { - wal_file_number: wal_contents.wal_file_number, - min_time: Timestamp::new(wal_contents.min_timestamp_ns), - max_time: Timestamp::new(wal_contents.max_timestamp_ns), - }); + if !self.wal_buffer.is_empty() { + // this caters for buffer present and snapshot none or present + let (mut wal_contents, responses) = self.flush_buffer_with_responses(); + self.snapshot_tracker.add_wal_period(WalPeriod { + wal_file_number: wal_contents.wal_file_number, + min_time: Timestamp::new(wal_contents.min_timestamp_ns), + max_time: Timestamp::new(wal_contents.max_timestamp_ns), + }); - let snapshot = match self.get_snapshot_info_and_permit(false).await { - Some(info) => { - wal_contents.snapshot = Some(info.0.snapshot_details); - Some(info) + let semaphore = match self.get_snapshot_details(force_snapshot) { + Some(details) => { + debug!(?details, ">>> adding semaphore snapshot to wal content"); + if force_snapshot { + wal_contents.add_force_snapshot_op(details); + + } else { + wal_contents.add_snapshot_op(details); + } + let semaphore = self.acquire_snapshot_permit().await; + Some(semaphore) + } + None => None, + }; + Some((wal_contents, responses, semaphore)) + } else if force_snapshot { + // buffer empty and forcing snapshot + match self.get_snapshot_details(force_snapshot) { + Some(details) => { + let (mut wal_contents, responses) = self.flush_buffer_with_responses(); + self.snapshot_tracker.add_wal_period(WalPeriod { + wal_file_number: wal_contents.wal_file_number, + min_time: Timestamp::new(wal_contents.min_timestamp_ns), + max_time: Timestamp::new(wal_contents.max_timestamp_ns), + }); + debug!(?details, ">>> adding semaphore snapshot to wal content"); + if force_snapshot { + wal_contents.add_force_snapshot_op(details); + + } else { + wal_contents.add_snapshot_op(details); + } + let semaphore = self.acquire_snapshot_permit().await; + Some((wal_contents, responses, Some(semaphore))) + } + None => None, } - None => None, - }; - (wal_contents, responses, snapshot) + } else { + None + } + } async fn acquire_snapshot_permit(&self) -> OwnedSemaphorePermit { @@ -519,6 +590,7 @@ impl FlushBuffer { database_to_write_batch: Default::default(), write_op_responses: vec![], catalog_batches: vec![], + snapshot_op: None, }; std::mem::swap(&mut self.wal_buffer, &mut new_buffer); @@ -543,11 +615,14 @@ struct WalBuffer { database_to_write_batch: HashMap, WriteBatch>, catalog_batches: Vec, write_op_responses: Vec>, + snapshot_op: Option, } impl WalBuffer { fn is_empty(&self) -> bool { - self.database_to_write_batch.is_empty() && self.catalog_batches.is_empty() + self.database_to_write_batch.is_empty() + && self.catalog_batches.is_empty() + && self.snapshot_op.is_none() } } @@ -590,6 +665,10 @@ impl WalBuffer { WalOp::Catalog(catalog_batch) => { self.catalog_batches.push(catalog_batch); } + WalOp::ForcedSnapshot(_) => { + // + } + WalOp::Snapshot(_) => {} } Ok(()) @@ -629,6 +708,9 @@ impl WalBuffer { ops.extend(self.catalog_batches.into_iter().map(WalOp::Catalog)); ops.extend(self.database_to_write_batch.into_values().map(WalOp::Write)); + if let Some(snapshot_op) = self.snapshot_op { + ops.push(snapshot_op); + }; ops.sort(); @@ -639,7 +721,6 @@ impl WalBuffer { max_timestamp_ns, wal_file_number: self.wal_file_sequence_number, ops, - snapshot: None, }, self.write_op_responses, ) @@ -684,7 +765,7 @@ mod tests { use std::any::Any; use tokio::sync::oneshot::Receiver; - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 2))] async fn write_flush_delete_and_load() { let time_provider: Arc = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); @@ -792,7 +873,7 @@ mod tests { wal.buffer_op_unconfirmed(op2.clone()).await.unwrap(); // create wal file 1 - let ret = wal.flush_buffer().await; + let ret = wal.flush_buffer(false).await; assert!(ret.is_none()); let file_1_contents = create::wal_contents( (1, 62_000_000_000, 1), @@ -860,7 +941,7 @@ mod tests { // create wal file 2 wal.buffer_op_unconfirmed(op2.clone()).await.unwrap(); - assert!(wal.flush_buffer().await.is_none()); + assert!(wal.flush_buffer(false).await.is_none()); let file_2_contents = create::wal_contents( (62_000_000_000, 62_000_000_000, 2), @@ -966,28 +1047,15 @@ mod tests { }); wal.buffer_op_unconfirmed(op3.clone()).await.unwrap(); - let (snapshot_done, snapshot_info, snapshot_permit) = wal.flush_buffer().await.unwrap(); - let expected_info = SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(1), - end_time_marker: 120000000000, - last_wal_sequence_number: WalFileSequenceNumber(2), - }, - wal_periods: vec![ - WalPeriod { - wal_file_number: WalFileSequenceNumber(1), - min_time: Timestamp::new(1), - max_time: Timestamp::new(62000000000), - }, - WalPeriod { - wal_file_number: WalFileSequenceNumber(2), - min_time: Timestamp::new(62000000000), - max_time: Timestamp::new(62000000000), - }, - ], + let (snapshot_done, snapshot_permit) = wal.flush_buffer(false).await.unwrap(); + let expected_details = SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), + end_time_marker: 120000000000, + first_wal_sequence_number: WalFileSequenceNumber(1), + last_wal_sequence_number: WalFileSequenceNumber(2), }; - assert_eq!(expected_info, snapshot_info); - snapshot_done.await.unwrap(); + let details = snapshot_done.await.unwrap(); + assert_eq!(expected_details, details); let file_3_contents = create::wal_contents_with_snapshot( (128_000_000_000, 128_000_000_000, 3), @@ -1026,6 +1094,7 @@ mod tests { SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber::new(1), end_time_marker: 120_000000000, + first_wal_sequence_number: WalFileSequenceNumber(1), last_wal_sequence_number: WalFileSequenceNumber(2), }, ); @@ -1037,10 +1106,10 @@ mod tests { let expected_writes = vec![file_1_contents, file_2_contents, file_3_contents.clone()]; assert_eq!(*notified_writes, expected_writes); let details = notifier.snapshot_details.lock(); - assert_eq!(details.unwrap(), expected_info.snapshot_details); + assert_eq!(details.unwrap(), expected_details); } - wal.remove_snapshot_wal_files(snapshot_info, snapshot_permit) + wal.remove_snapshot_wal_files(details, snapshot_permit) .await; // test that replay now only has file 3 @@ -1065,8 +1134,7 @@ mod tests { .unwrap(); let notified_writes = replay_notifier.notified_writes.lock(); assert_eq!(*notified_writes, vec![file_3_contents.clone()]); - let snapshot_details = replay_notifier.snapshot_details.lock(); - assert_eq!(*snapshot_details, file_3_contents.snapshot); + // let snapshot_details = replay_notifier.snapshot_details.lock(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1091,7 +1159,7 @@ mod tests { None, ); - assert!(wal.flush_buffer().await.is_none()); + assert!(wal.flush_buffer(false).await.is_none()); let notifier = notifier.as_any().downcast_ref::().unwrap(); assert!(notifier.notified_writes.lock().is_empty()); @@ -1111,6 +1179,7 @@ mod tests { database_to_write_batch: Default::default(), catalog_batches: vec![], write_op_responses: vec![], + snapshot_op: None, }; time_provider.set(Time::from_timestamp_millis(1234).unwrap()); let (wal_contents, _) = wal_buffer.into_wal_contents_and_responses(); @@ -1125,32 +1194,50 @@ mod tests { #[async_trait] impl WalFileNotifier for TestNotifier { - fn notify(&self, write: WalContents) { - self.notified_writes.lock().push(write); - } - - async fn notify_and_snapshot( + async fn notify( &self, - write: WalContents, - snapshot_details: SnapshotDetails, - ) -> Receiver { - self.notified_writes.lock().push(write); - *self.snapshot_details.lock() = Some(snapshot_details); - - let (sender, receiver) = tokio::sync::oneshot::channel(); - tokio::spawn(async move { - sender.send(snapshot_details).unwrap(); - }); - - receiver + wal_contents: WalContents, + do_snapshot: bool, + ) -> Option> { + self.notified_writes.lock().push(wal_contents.clone()); + if do_snapshot { + let snapshot_details = wal_contents.find_snapshot_details(); + if let Some(snapshot_details) = snapshot_details { + *self.snapshot_details.lock() = Some(snapshot_details); + + let (sender, receiver) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + sender.send(snapshot_details).unwrap(); + }); + + return Some(receiver); + } + } + None } + // async fn notify_and_snapshot( + // &self, + // write: WalContents, + // snapshot_details: SnapshotDetails, + // ) -> Receiver { + // self.notified_writes.lock().push(write); + // *self.snapshot_details.lock() = Some(snapshot_details); + // + // let (sender, receiver) = tokio::sync::oneshot::channel(); + // tokio::spawn(async move { + // sender.send(snapshot_details).unwrap(); + // }); + // + // receiver + // } + fn as_any(&self) -> &dyn Any { self } - async fn snapshot(&self, _snapshot_details: SnapshotDetails) -> Receiver { - unimplemented!() - } + // async fn snapshot(&self, _snapshot_details: SnapshotDetails) -> Receiver { + // unimplemented!() + // } } } diff --git a/influxdb3_wal/src/serialize.rs b/influxdb3_wal/src/serialize.rs index ddee1b4edda..2bbac6af180 100644 --- a/influxdb3_wal/src/serialize.rs +++ b/influxdb3_wal/src/serialize.rs @@ -132,7 +132,6 @@ mod tests { min_time_ns: 0, max_time_ns: 10, })], - snapshot: None, }; let bytes = serialize_to_file_bytes(&contents).unwrap(); diff --git a/influxdb3_wal/src/snapshot_tracker.rs b/influxdb3_wal/src/snapshot_tracker.rs index 1824aeed2c4..c5b075703fc 100644 --- a/influxdb3_wal/src/snapshot_tracker.rs +++ b/influxdb3_wal/src/snapshot_tracker.rs @@ -57,7 +57,7 @@ impl SnapshotTracker { /// In the case of data coming in for future times, we will be unable to snapshot older data. /// Over time this will back up the WAL. To guard against this, if the number of WAL periods /// is >= 3x the snapshot size, snapshot everything up to the last period. - pub(crate) fn snapshot(&mut self, force_snapshot: bool) -> Option { + pub(crate) fn snapshot(&mut self, force_snapshot: bool) -> Option { debug!( wal_periods = ?self.wal_periods, wal_periods_len = ?self.wal_periods.len(), @@ -107,7 +107,7 @@ impl SnapshotTracker { true } - pub(crate) fn snapshot_in_order_wal_periods(&mut self) -> Option { + pub(crate) fn snapshot_in_order_wal_periods(&mut self) -> Option { let t = self.wal_periods.last().unwrap().max_time; // round the last timestamp down to the gen1_duration let t = t - (t.get() % self.gen1_duration.as_nanos()); @@ -121,24 +121,30 @@ impl SnapshotTracker { .cloned() .collect::>(); debug!(?periods_to_snapshot, ">>> periods to snapshot"); + let first_wal_file_number = periods_to_snapshot + .iter() + .peekable() + .peek() + .map(|period| period.wal_file_number)?; + + let last_wal_file_number = periods_to_snapshot + .iter() + .last() + .map(|period| period.wal_file_number)?; + + // remove the wal periods and return the snapshot details + self.wal_periods + .retain(|p| p.wal_file_number > last_wal_file_number); - periods_to_snapshot.last().cloned().map(|period| { - // remove the wal periods and return the snapshot details - self.wal_periods - .retain(|p| p.wal_file_number > period.wal_file_number); - - SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: self.increment_snapshot_sequence_number(), - end_time_marker: t.get(), - last_wal_sequence_number: period.wal_file_number, - }, - wal_periods: periods_to_snapshot, - } + Some(SnapshotDetails { + snapshot_sequence_number: self.increment_snapshot_sequence_number(), + end_time_marker: t.get(), + first_wal_sequence_number: first_wal_file_number, + last_wal_sequence_number: last_wal_file_number, }) } - fn snapshot_all(&mut self) -> Option { + fn snapshot_all(&mut self) -> Option { let n_periods_to_take = self.wal_periods.len() - 1; let wal_periods: Vec = self.wal_periods.drain(0..n_periods_to_take).collect(); let max_time = wal_periods @@ -148,17 +154,16 @@ impl SnapshotTracker { .unwrap(); let t = max_time - (max_time.get() % self.gen1_duration.as_nanos()) + self.gen1_duration.as_nanos(); - let last_wal_sequence_number = wal_periods.last().unwrap().wal_file_number; + let first_wal_sequence_number = wal_periods.first()?.wal_file_number; + let last_wal_sequence_number = wal_periods.last()?.wal_file_number; let snapshot_details = SnapshotDetails { snapshot_sequence_number: self.increment_snapshot_sequence_number(), end_time_marker: t.get(), + first_wal_sequence_number, last_wal_sequence_number, }; - Some(SnapshotInfo { - snapshot_details, - wal_periods, - }) + Some(snapshot_details) } /// The number of wal periods we need to see before we attempt a snapshot. This is to ensure that we @@ -183,11 +188,11 @@ impl SnapshotTracker { } } -#[derive(Debug, Eq, PartialEq)] -pub struct SnapshotInfo { - pub snapshot_details: SnapshotDetails, - pub(crate) wal_periods: Vec, -} +// #[derive(Debug, Eq, PartialEq)] +// pub struct SnapshotInfo { +// pub snapshot_details: SnapshotDetails, +// pub(crate) wal_periods: Vec, +// } /// A struct that represents a period of time in the WAL. This is used to track the data timestamps /// and sequence numbers for each period of the WAL (which will be a file in object store, if enabled). @@ -260,13 +265,11 @@ mod tests { tracker.add_wal_period(p3.clone()); assert_eq!( tracker.snapshot(false), - Some(SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(1), - end_time_marker: 120_000000000, - last_wal_sequence_number: WalFileSequenceNumber::new(2) - }, - wal_periods: vec![p1, p2], + Some(SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), + end_time_marker: 120_000000000, + first_wal_sequence_number: WalFileSequenceNumber::new(1), + last_wal_sequence_number: WalFileSequenceNumber::new(2), }) ); tracker.add_wal_period(p4.clone()); @@ -274,13 +277,11 @@ mod tests { tracker.add_wal_period(p5.clone()); assert_eq!( tracker.snapshot(false), - Some(SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(2), - end_time_marker: 240_000000000, - last_wal_sequence_number: WalFileSequenceNumber::new(3) - }, - wal_periods: vec![p3] + Some(SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(2), + end_time_marker: 240_000000000, + first_wal_sequence_number: WalFileSequenceNumber::new(3), + last_wal_sequence_number: WalFileSequenceNumber::new(3) }) ); @@ -289,13 +290,11 @@ mod tests { tracker.add_wal_period(p6.clone()); assert_eq!( tracker.snapshot(false), - Some(SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(3), - end_time_marker: 360_000000000, - last_wal_sequence_number: WalFileSequenceNumber::new(5) - }, - wal_periods: vec![p4, p5] + Some(SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(3), + end_time_marker: 360_000000000, + first_wal_sequence_number: WalFileSequenceNumber::new(4), + last_wal_sequence_number: WalFileSequenceNumber::new(5) }) ); @@ -348,13 +347,11 @@ mod tests { assert_eq!( tracker.snapshot(false), - Some(SnapshotInfo { - snapshot_details: SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(1), - end_time_marker: 360000000000, - last_wal_sequence_number: WalFileSequenceNumber::new(5) - }, - wal_periods: vec![p1, p2, p3, p4, p5] + Some(SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), + end_time_marker: 360000000000, + first_wal_sequence_number: WalFileSequenceNumber::new(1), + last_wal_sequence_number: WalFileSequenceNumber::new(5), }) ); } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 6f00f4ff9ca..218a612be26 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -196,6 +196,7 @@ impl WriteBufferImpl { let persisted_snapshots = persister .load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START) .await?; + debug!(?persisted_snapshots, ">>> all persisted snapshots"); let last_wal_sequence_number = persisted_snapshots .first() .map(|s| s.wal_file_sequence_number); @@ -400,21 +401,7 @@ async fn check_mem_and_force_snapshot( memory_threshold_bytes, "forcing snapshot" ); let wal = Arc::clone(&write_buffer.wal); - let maybe_snapshot_info = wal.snapshot_info_and_permit(true).await; - match maybe_snapshot_info { - Some((snapshot_info, permit)) => { - debug!(?snapshot_info, "Running snapshot"); - let snapshot_done = write_buffer - .buffer - .snapshot(snapshot_info.snapshot_details) - .await; - wal.cleanup_after_snapshot(snapshot_done, snapshot_info, permit) - .await; - } - None => { - debug!("No info to snapshot"); - } - } + wal.force_flush_buffer().await; } } @@ -1769,7 +1756,7 @@ mod tests { /// This is the reproducer for [#25277][see] /// /// [see]: https://github.com/influxdata/influxdb/issues/25277 - #[tokio::test] + #[test_log::test(tokio::test)] async fn writes_not_dropped_on_snapshot() { let obj_store: Arc = Arc::new(InMemory::new()); let wal_config = WalConfig { diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_write/src/write_buffer/plugins.rs index 0ba393c5b7f..fed04371cc1 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_write/src/write_buffer/plugins.rs @@ -165,6 +165,8 @@ mod python_plugin { } } WalOp::Catalog(_) => {} + WalOp::ForcedSnapshot(_) => {} + WalOp::Snapshot(_) => {} } } } diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 6f761af4b54..4e222384d95 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -36,9 +36,9 @@ use parking_lot::{Mutex, RwLock}; use parquet::format::FileMetaData; use schema::sort::SortKey; use schema::Schema; -use std::any::Any; use std::sync::Arc; use std::time::Duration; +use std::any::Any; use tokio::sync::oneshot::Receiver; use tokio::sync::{broadcast, oneshot}; @@ -241,9 +241,10 @@ impl QueryableBuffer { } info!( - "persisting {} chunks for wal number {}", + "persisting {} chunks for wal number {} and snapshot seq num {}", persist_jobs.len(), snapshot_details.last_wal_sequence_number.as_u64(), + snapshot_details.snapshot_sequence_number.as_u64(), ); let (persisted_snapshot, cache_notifiers) = persist_parquet_files_and_get_summary( &persister, @@ -456,38 +457,48 @@ async fn persist_parquet_files_and_get_summary( #[async_trait] impl WalFileNotifier for QueryableBuffer { - fn notify(&self, write: WalContents) { + async fn notify( + &self, + wal_contents: WalContents, + do_snapshot: bool, + ) -> Option> { if let Some(sender) = self.plugin_event_tx.lock().as_ref() { - if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) { + if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new( + wal_contents.clone(), + ))) { error!(%err, "Error sending WAL content to plugins"); } } - self.buffer_contents(write) - } - async fn notify_and_snapshot( - &self, - write: WalContents, - snapshot_details: SnapshotDetails, - ) -> Receiver { - if let Some(sender) = self.plugin_event_tx.lock().as_ref() { - if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) { - error!(%err, "Error sending WAL content to plugins"); + if do_snapshot { + if let Some(snapshot_details) = wal_contents.find_snapshot_details() { + return Some( + self.buffer_contents_and_persist_snapshotted_data( + wal_contents, + snapshot_details, + ) + .await, + ); } } - self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details) - .await + self.buffer_contents(wal_contents); + None } - async fn snapshot(&self, snapshot_details: SnapshotDetails) -> Receiver { - let persist_jobs = self.prepare_persist_jobs_from_chunks( - snapshot_details.end_time_marker, - snapshot_details.last_wal_sequence_number, - None, - ); - self.snapshot_data_and_clear_buffer(persist_jobs, snapshot_details) - .await - } + // // deprecate + // async fn notify_and_snapshot( + // &self, + // write: WalContents, + // snapshot_details: SnapshotDetails, + // ) -> Receiver { + // if let Some(sender) = self.plugin_event_tx.lock().as_ref() { + // if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) { + // error!(%err, "Error sending WAL content to plugins"); + // } + // } + // self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details) + // .await + // } fn as_any(&self) -> &dyn Any { self @@ -604,6 +615,8 @@ impl BufferState { } } } + WalOp::ForcedSnapshot(_) => {} + WalOp::Snapshot(_) => {} } } } @@ -640,6 +653,7 @@ impl BufferState { TableBuffer::new(index_columns, SortKey::from_columns(sort_key)) }); for (chunk_time, chunk) in table_chunks.chunk_time_to_chunk { + debug!(">>> adding to table chunks"); table_buffer.buffer_chunk(chunk_time, chunk.rows); } } @@ -854,19 +868,19 @@ mod tests { max_timestamp_ns: batch.max_time_ns, wal_file_number: WalFileSequenceNumber::new(1), ops: vec![WalOp::Write(batch)], - snapshot: None, }; let end_time = wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; // write the lp into the buffer - queryable_buffer.notify(wal_contents); + queryable_buffer.notify(wal_contents, true).await; // now force a snapshot, persisting the data to parquet file. Also, buffer up a new write let snapshot_sequence_number = SnapshotSequenceNumber::new(1); let snapshot_details = SnapshotDetails { snapshot_sequence_number, end_time_marker: end_time, + first_wal_sequence_number: WalFileSequenceNumber::new(1), last_wal_sequence_number: WalFileSequenceNumber::new(2), }; @@ -884,15 +898,15 @@ mod tests { min_timestamp_ns: batch.min_time_ns, max_timestamp_ns: batch.max_time_ns, wal_file_number: WalFileSequenceNumber::new(2), - ops: vec![WalOp::Write(batch)], - snapshot: None, + ops: vec![WalOp::Write(batch), WalOp::Snapshot(snapshot_details)], }; let end_time = wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; let details = queryable_buffer - .notify_and_snapshot(wal_contents, snapshot_details) - .await; + .notify(wal_contents, true) + .await + .unwrap(); let _details = details.await.unwrap(); // validate we have a single persisted file @@ -908,21 +922,22 @@ mod tests { let snapshot_details = SnapshotDetails { snapshot_sequence_number, end_time_marker: end_time, + first_wal_sequence_number: WalFileSequenceNumber::new(3), last_wal_sequence_number: WalFileSequenceNumber::new(3), }; queryable_buffer - .notify_and_snapshot( + .notify( WalContents { persist_timestamp_ms: 0, min_timestamp_ns: 0, max_timestamp_ns: 0, wal_file_number: WalFileSequenceNumber::new(3), - ops: vec![], - snapshot: Some(snapshot_details), + ops: vec![WalOp::Snapshot(snapshot_details)], }, - snapshot_details, + true, ) .await + .unwrap() .await .unwrap();