Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub trait Loglet: Send + Sync + std::fmt::Debug {
/// retry failing appends indefinitely until the loglet is sealed. In that case, such commits
/// might still appear to future readers but without returning the commit acknowledgement to
/// the original writer.
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError>;
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError>;

/// The tail is *the first unwritten position* in the loglet.
///
Expand Down
13 changes: 3 additions & 10 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;
use futures::{Stream, StreamExt};
use tracing::instrument;

use restate_core::ShutdownError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber};
use restate_types::logs::{Record, TailState};
Expand Down Expand Up @@ -140,10 +139,7 @@ impl LogletWrapper {
#[allow(unused)]
#[cfg(any(test, feature = "test-util"))]
pub async fn append(&self, payload: Record) -> Result<Lsn, AppendError> {
let commit = self
.enqueue_batch(Arc::new([payload]))
.await
.map_err(AppendError::Shutdown)?;
let commit = self.enqueue_batch(Arc::new([payload])).await?;
commit.await
}

Expand All @@ -166,13 +162,10 @@ impl LogletWrapper {
)
)]
pub async fn append_batch(&self, payloads: Arc<[Record]>) -> Result<Lsn, AppendError> {
self.enqueue_batch(payloads)
.await
.map_err(AppendError::Shutdown)?
.await
self.enqueue_batch(payloads).await?.await
}

pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<Commit, ShutdownError> {
pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<Commit, OperationError> {
if self.tail_lsn.is_some() {
return Ok(Commit::sealed());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Loglet for LocalLoglet {
Box::pin(self.tail_watch.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
// NOTE: This implementation doesn't perform pipelined writes yet. This will block the caller
// while the underlying write is in progress and only return the Commit future as resolved.
// This is temporary until pipelined writes are fully supported.
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl Loglet for MemoryLoglet {
Box::pin(self.tail_watch.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
let mut log = self.log.lock().unwrap();
if self.sealed.load(Ordering::Relaxed) {
return Ok(LogletCommit::sealed());
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
Box::pin(self.known_global_tail.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
match self.sequencer {
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
SequencerAccess::Remote { .. } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::{
record_cache::RecordCache,
replication::spread_selector::{SelectorStrategy, SpreadSelector},
};
use crate::loglet::{util::TailOffsetWatch, LogletCommit};
use crate::loglet::{util::TailOffsetWatch, LogletCommit, OperationError};
use appender::SequencerAppender;

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<T: TransportConnect> Sequencer<T> {
pub async fn enqueue_batch(
&self,
payloads: Arc<[Record]>,
) -> Result<LogletCommit, ShutdownError> {
) -> Result<LogletCommit, OperationError> {
if self
.sequencer_shared_state
.global_committed_tail()
Expand Down