diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index bce23d0f26..e64c810057 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -110,7 +110,7 @@ pub trait Loglet: Send + Sync + std::fmt::Debug { /// retry failing appends indefinitely until the loglet is sealed. In that case, such commits /// might still appear to future readers but without returning the commit acknowledgement to /// the original writer. - async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result; + async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result; /// The tail is *the first unwritten position* in the loglet. /// diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index 38028b56b6..13ab271b65 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use futures::{Stream, StreamExt}; use tracing::instrument; -use restate_core::ShutdownError; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber}; use restate_types::logs::{Record, TailState}; @@ -140,10 +139,7 @@ impl LogletWrapper { #[allow(unused)] #[cfg(any(test, feature = "test-util"))] pub async fn append(&self, payload: Record) -> Result { - let commit = self - .enqueue_batch(Arc::new([payload])) - .await - .map_err(AppendError::Shutdown)?; + let commit = self.enqueue_batch(Arc::new([payload])).await?; commit.await } @@ -166,13 +162,10 @@ impl LogletWrapper { ) )] pub async fn append_batch(&self, payloads: Arc<[Record]>) -> Result { - self.enqueue_batch(payloads) - .await - .map_err(AppendError::Shutdown)? - .await + self.enqueue_batch(payloads).await?.await } - pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { if self.tail_lsn.is_some() { return Ok(Commit::sealed()); } diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 7c0d9c4189..2de244ca99 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -136,7 +136,7 @@ impl Loglet for LocalLoglet { Box::pin(self.tail_watch.to_stream()) } - async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { // NOTE: This implementation doesn't perform pipelined writes yet. This will block the caller // while the underlying write is in progress and only return the Commit future as resolved. // This is temporary until pipelined writes are fully supported. diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 8358d9cd6d..1a9bf89778 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -325,7 +325,7 @@ impl Loglet for MemoryLoglet { Box::pin(self.tail_watch.to_stream()) } - async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { let mut log = self.log.lock().unwrap(); if self.sealed.load(Ordering::Relaxed) { return Ok(LogletCommit::sealed()); diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 8653a1661f..102e44ec33 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -139,7 +139,7 @@ impl Loglet for ReplicatedLoglet { Box::pin(self.known_global_tail.to_stream()) } - async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { match self.sequencer { SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await, SequencerAccess::Remote { .. } => { diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs index fa9c3ca433..b2128c8a63 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -32,7 +32,7 @@ use super::{ record_cache::RecordCache, replication::spread_selector::{SelectorStrategy, SpreadSelector}, }; -use crate::loglet::{util::TailOffsetWatch, LogletCommit}; +use crate::loglet::{util::TailOffsetWatch, LogletCommit, OperationError}; use appender::SequencerAppender; #[derive(thiserror::Error, Debug)] @@ -160,7 +160,7 @@ impl Sequencer { pub async fn enqueue_batch( &self, payloads: Arc<[Record]>, - ) -> Result { + ) -> Result { if self .sequencer_shared_state .global_committed_tail()