From 4b58953d7817e6abb037063a013e4316789bd231 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 9 Oct 2024 13:44:27 +0200 Subject: [PATCH] feat(spooler): Implement cross stacks spooling (#4107) --- relay-server/benches/benches.rs | 78 +- relay-server/src/lib.rs | 15 +- relay-server/src/services/buffer/common.rs | 34 + .../services/buffer/envelope_buffer/mod.rs | 550 +++++--------- .../buffer/envelope_repository/memory.rs | 75 ++ .../buffer/envelope_repository/mod.rs | 136 ++++ .../buffer/envelope_repository/sqlite.rs | 681 ++++++++++++++++++ .../services/buffer/envelope_stack/memory.rs | 35 - .../src/services/buffer/envelope_stack/mod.rs | 26 - .../services/buffer/envelope_stack/sqlite.rs | 496 ------------- relay-server/src/services/buffer/mod.rs | 30 +- .../services/buffer/stack_provider/memory.rs | 52 -- .../src/services/buffer/stack_provider/mod.rs | 69 -- .../services/buffer/stack_provider/sqlite.rs | 206 ------ relay-server/src/services/buffer/testutils.rs | 24 +- relay-server/src/statsd.rs | 14 +- tests/integration/test_healthchecks.py | 2 +- 17 files changed, 1210 insertions(+), 1313 deletions(-) create mode 100644 relay-server/src/services/buffer/envelope_repository/memory.rs create mode 100644 relay-server/src/services/buffer/envelope_repository/mod.rs create mode 100644 relay-server/src/services/buffer/envelope_repository/sqlite.rs delete mode 100644 relay-server/src/services/buffer/envelope_stack/memory.rs delete mode 100644 relay-server/src/services/buffer/envelope_stack/mod.rs delete mode 100644 relay-server/src/services/buffer/envelope_stack/sqlite.rs delete mode 100644 relay-server/src/services/buffer/stack_provider/memory.rs delete mode 100644 relay-server/src/services/buffer/stack_provider/mod.rs delete mode 100644 relay-server/src/services/buffer/stack_provider/sqlite.rs diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 045ad1bd93..192aabdfb9 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -11,8 +11,8 @@ use tokio::runtime::Runtime; use relay_base_schema::project::ProjectKey; use relay_server::{ - Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer, - SqliteEnvelopeStack, SqliteEnvelopeStore, + Envelope, EnvelopeBufferImpl, MemoryChecker, MemoryStat, ProjectKeyPair, + SqliteEnvelopeRepository, SqliteEnvelopeStore, }; fn setup_db(path: &PathBuf) -> Pool { @@ -70,7 +70,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box = Config::from_json_value(serde_json::json!({ + "spool": { + "disk_batch_size": disk_batch_size + } + })) + .unwrap() + .into(); + + let stack = SqliteEnvelopeRepository::new_with_store( + &config, envelope_store.clone(), - disk_batch_size, - 2, - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - true, ); let mut envelopes = Vec::with_capacity(size); @@ -116,7 +125,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { |(mut stack, envelopes)| { runtime.block_on(async { for envelope in envelopes { - stack.push(envelope).await.unwrap(); + stack.push(project_key_pair, envelope).await.unwrap(); } }); }, @@ -134,19 +143,24 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { runtime.block_on(async { reset_db(db.clone()).await; - let mut stack = SqliteEnvelopeStack::new( + let config: Arc = + Config::from_json_value(serde_json::json!({ + "spool": { + "disk_batch_size": disk_batch_size + } + })) + .unwrap() + .into(); + + let mut stack = SqliteEnvelopeRepository::new_with_store( + &config, envelope_store.clone(), - disk_batch_size, - 2, - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - true, ); // Pre-fill the stack for _ in 0..size { let envelope = mock_envelope(envelope_size); - stack.push(envelope).await.unwrap(); + stack.push(project_key_pair, envelope).await.unwrap(); } stack @@ -156,7 +170,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { runtime.block_on(async { // Benchmark popping for _ in 0..size { - stack.pop().await.unwrap(); + stack.pop(project_key_pair).await.unwrap(); } }); }, @@ -175,13 +189,17 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { reset_db(db.clone()).await; }); - let stack = SqliteEnvelopeStack::new( + let config: Arc = Config::from_json_value(serde_json::json!({ + "spool": { + "disk_batch_size": disk_batch_size + } + })) + .unwrap() + .into(); + + let stack = SqliteEnvelopeRepository::new_with_store( + &config, envelope_store.clone(), - disk_batch_size, - 2, - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), - true, ); // Pre-generate envelopes @@ -196,12 +214,12 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { for _ in 0..size { if rand::random::() { if let Some(envelope) = envelope_iter.next() { - stack.push(envelope).await.unwrap(); + stack.push(project_key_pair, envelope).await.unwrap(); } - } else if stack.pop().await.is_err() { + } else if stack.pop(project_key_pair).await.is_err() { // If pop fails (empty stack), push instead if let Some(envelope) = envelope_iter.next() { - stack.push(envelope).await.unwrap(); + stack.push(project_key_pair, envelope).await.unwrap(); } } } @@ -262,7 +280,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) + EnvelopeBufferImpl::from_config(&config, memory_checker.clone()) .await .unwrap(); for envelope in envelopes.into_iter() { @@ -294,7 +312,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) + EnvelopeBufferImpl::from_config(&config, memory_checker.clone()) .await .unwrap(); let n = envelopes.len(); @@ -317,6 +335,6 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { group.finish(); } -criterion_group!(sqlite, benchmark_sqlite_envelope_stack); +criterion_group!(sqlite, benchmark_sqlite_envelope_repository); criterion_group!(buffer, benchmark_envelope_buffer); criterion_main!(sqlite, buffer); diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 986db2e5d4..6463a8cc45 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -266,12 +266,17 @@ mod services; mod statsd; mod utils; -pub use self::envelope::Envelope; // pub for benchmarks +// pub for benchmarks +pub use self::envelope::Envelope; +// pub for benchmarks pub use self::services::buffer::{ - EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, -}; // pub for benchmarks + EnvelopeBufferImpl, EnvelopeRepository, ProjectKeyPair, SqliteEnvelopeRepository, + SqliteEnvelopeStore, +}; +// pub for benchmarks pub use self::services::spooler::spool_utils; -pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks +// pub for benchmarks +pub use self::utils::{MemoryChecker, MemoryStat}; #[cfg(test)] mod testutils; @@ -288,7 +293,7 @@ use crate::services::server::HttpServer; /// /// This effectively boots the entire server application. It blocks the current thread until a /// shutdown signal is received or a fatal error happens. Behavior of the server is determined by -/// the `config` passed into this funciton. +/// the `config` passed into this function. pub fn run(config: Config) -> anyhow::Result<()> { let config = Arc::new(config); relay_log::info!("relay server starting"); diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 924f8aa1c8..4834765db6 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -1,15 +1,40 @@ use relay_base_schema::project::ProjectKey; +use std::convert::Infallible; +use crate::services::buffer::envelope_repository::sqlite::SqliteEnvelopeRepositoryError; +use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::Envelope; +/// Error that occurs while interacting with the envelope buffer. +#[derive(Debug, thiserror::Error)] +pub enum EnvelopeBufferError { + #[error("sqlite")] + SqliteStore(#[from] SqliteEnvelopeStoreError), + + #[error("sqlite")] + SqliteRepository(#[from] SqliteEnvelopeRepositoryError), + + #[error("failed to push envelope to the buffer")] + PushFailed, +} + +impl From for EnvelopeBufferError { + fn from(value: Infallible) -> Self { + match value {} + } +} + /// Struct that represents two project keys. #[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)] pub struct ProjectKeyPair { + /// [`ProjectKey`] of the project of the envelope. pub own_key: ProjectKey, + /// [`ProjectKey`] of the root project of the trace to which the envelope belongs. pub sampling_key: ProjectKey, } impl ProjectKeyPair { + /// Creates a new [`ProjectKeyPair`] with the given `own_key` and `sampling_key`. pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { Self { own_key, @@ -17,12 +42,21 @@ impl ProjectKeyPair { } } + /// Creates a [`ProjectKeyPair`] from an [`Envelope`]. + /// + /// The `own_key` is set to the public key from the envelope's metadata. + /// The `sampling_key` is set to the envelope's sampling key if present, + /// otherwise it defaults to the `own_key`. pub fn from_envelope(envelope: &Envelope) -> Self { let own_key = envelope.meta().public_key(); let sampling_key = envelope.sampling_key().unwrap_or(own_key); Self::new(own_key, sampling_key) } + /// Returns an iterator over the project keys. + /// + /// The iterator always yields the `own_key` and yields the `sampling_key` + /// only if it's different from the `own_key`. pub fn iter(&self) -> impl Iterator { let Self { own_key, diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 0ff0bc2b7e..3696dc0ec2 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,195 +1,34 @@ use std::cmp::Ordering; use std::collections::BTreeSet; -use std::convert::Infallible; use std::error::Error; -use std::mem; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; use std::time::Duration; -use hashbrown::HashSet; +use crate::envelope::Envelope; +use crate::services::buffer::common::{EnvelopeBufferError, ProjectKeyPair}; +use crate::services::buffer::envelope_repository::EnvelopeRepository; +use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; +use crate::MemoryChecker; +use hashbrown::{HashMap, HashSet}; +use priority_queue::PriorityQueue; use relay_base_schema::project::ProjectKey; use relay_config::Config; use tokio::time::{timeout, Instant}; -use crate::envelope::Envelope; -use crate::services::buffer::common::ProjectKeyPair; -use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; -use crate::services::buffer::envelope_stack::EnvelopeStack; -use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; -use crate::services::buffer::stack_provider::memory::MemoryStackProvider; -use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; -use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::MemoryChecker; - -/// Polymorphic envelope buffering interface. -/// -/// The underlying buffer can either be disk-based or memory-based, -/// depending on the given configuration. -/// -/// NOTE: This is implemented as an enum because a trait object with async methods would not be -/// object safe. -#[derive(Debug)] -#[allow(private_interfaces)] -pub enum PolymorphicEnvelopeBuffer { - /// An enveloper buffer that uses in-memory envelopes stacks. - InMemory(EnvelopeBuffer), - /// An enveloper buffer that uses sqlite envelopes stacks. - Sqlite(EnvelopeBuffer), -} - -impl PolymorphicEnvelopeBuffer { - /// Returns true if the implementation stores all envelopes in RAM. - pub fn is_memory(&self) -> bool { - match self { - PolymorphicEnvelopeBuffer::InMemory(_) => true, - PolymorphicEnvelopeBuffer::Sqlite(_) => false, - } - } - - /// Creates either a memory-based or a disk-based envelope buffer, - /// depending on the given configuration. - pub async fn from_config( - config: &Config, - memory_checker: MemoryChecker, - ) -> Result { - let buffer = if config.spool_envelopes_path().is_some() { - relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer"); - let buffer = EnvelopeBuffer::::new(config).await?; - Self::Sqlite(buffer) - } else { - relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer"); - let buffer = EnvelopeBuffer::::new(memory_checker); - Self::InMemory(buffer) - }; - - Ok(buffer) - } - - /// Initializes the envelope buffer. - pub async fn initialize(&mut self) { - match self { - PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await, - PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await, - } - } - - /// Adds an envelope to the buffer. - pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - relay_statsd::metric!(timer(RelayTimers::BufferPush), { - match self { - Self::Sqlite(buffer) => buffer.push(envelope).await, - Self::InMemory(buffer) => buffer.push(envelope).await, - }?; - }); - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); - Ok(()) - } - - /// Returns a reference to the next-in-line envelope. - pub async fn peek(&mut self) -> Result { - relay_statsd::metric!(timer(RelayTimers::BufferPeek), { - match self { - Self::Sqlite(buffer) => buffer.peek().await, - Self::InMemory(buffer) => buffer.peek().await, - } - }) - } - - /// Pops the next-in-line envelope. - pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), { - match self { - Self::Sqlite(buffer) => buffer.pop().await, - Self::InMemory(buffer) => buffer.pop().await, - }? - }); - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); - Ok(envelope) - } - - /// Marks a project as ready or not ready. - /// - /// The buffer re-prioritizes its envelopes based on this information. - /// Returns `true` if at least one priority was changed. - pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { - match self { - Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), - Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), - } - } - - /// Marks a stack as seen. - /// - /// Non-ready stacks are deprioritized when they are marked as seen, such that - /// the next call to `.peek()` will look at a different stack. This prevents - /// head-of-line blocking. - pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { - match self { - Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch), - Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch), - } - } - - /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. - pub fn has_capacity(&self) -> bool { - match self { - Self::Sqlite(buffer) => buffer.has_capacity(), - Self::InMemory(buffer) => buffer.has_capacity(), - } - } - - /// Shuts down the [`PolymorphicEnvelopeBuffer`]. - pub async fn shutdown(&mut self) -> bool { - // Currently, we want to flush the buffer only for disk, since the in memory implementation - // tries to not do anything and pop as many elements as possible within the shutdown - // timeout. - let Self::Sqlite(buffer) = self else { - relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed"); - return false; - }; - buffer.flush().await; - - true - } -} - -/// Error that occurs while interacting with the envelope buffer. -#[derive(Debug, thiserror::Error)] -pub enum EnvelopeBufferError { - #[error("sqlite")] - SqliteStore(#[from] SqliteEnvelopeStoreError), - - #[error("sqlite")] - SqliteStack(#[from] SqliteEnvelopeStackError), - - #[error("failed to push envelope to the buffer")] - PushFailed, -} - -impl From for EnvelopeBufferError { - fn from(value: Infallible) -> Self { - match value {} - } -} - /// An envelope buffer that holds an individual stack for each project/sampling project combination. /// /// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] -struct EnvelopeBuffer { +pub struct EnvelopeBuffer { /// The central priority queue. - priority_queue: priority_queue::PriorityQueue, Priority>, - /// A lookup table to find all stacks involving a project. - stacks_by_project: hashbrown::HashMap>, - /// A provider of stacks that provides utilities to create stacks, check their capacity... - /// - /// This indirection is needed because different stack implementations might need different - /// initialization (e.g. a database connection). - stack_provider: P, + priority_queue: PriorityQueue, + /// A lookup table to find all project key pairs for a given project. + project_to_pairs: HashMap>, + /// Repository of envelopes that can provide envelopes via different implementations. + envelope_repository: EnvelopeRepository, /// The total count of envelopes that the buffer is working with. /// /// Note that this count is not meant to be perfectly accurate since the initialization of the @@ -204,155 +43,147 @@ struct EnvelopeBuffer { total_count_initialized: bool, } -impl EnvelopeBuffer { - /// Creates an empty memory-based buffer. - pub fn new(memory_checker: MemoryChecker) -> Self { - Self { - stacks_by_project: Default::default(), - priority_queue: Default::default(), - stack_provider: MemoryStackProvider::new(memory_checker), - total_count: Arc::new(AtomicI64::new(0)), - total_count_initialized: false, - } - } -} +impl EnvelopeBuffer { + /// Creates either a memory-based or a disk-based envelope buffer, + /// depending on the given configuration. + pub async fn from_config( + config: &Config, + memory_checker: MemoryChecker, + ) -> Result { + let buffer = if config.spool_envelopes_path().is_some() { + relay_log::trace!("EnvelopeBuffer: initializing sqlite envelope buffer"); + Self { + project_to_pairs: Default::default(), + priority_queue: Default::default(), + envelope_repository: EnvelopeRepository::sqlite(config).await?, + total_count: Arc::new(AtomicI64::new(0)), + total_count_initialized: false, + } + } else { + relay_log::trace!("EnvelopeBuffer: initializing memory envelope buffer"); + Self { + project_to_pairs: Default::default(), + priority_queue: Default::default(), + envelope_repository: EnvelopeRepository::memory(memory_checker)?, + total_count: Arc::new(AtomicI64::new(0)), + total_count_initialized: false, + } + }; -#[allow(dead_code)] -impl EnvelopeBuffer { - /// Creates an empty sqlite-based buffer. - pub async fn new(config: &Config) -> Result { - Ok(Self { - stacks_by_project: Default::default(), - priority_queue: Default::default(), - stack_provider: SqliteStackProvider::new(config).await?, - total_count: Arc::new(AtomicI64::new(0)), - total_count_initialized: false, - }) + Ok(buffer) } -} -impl EnvelopeBuffer

-where - EnvelopeBufferError: From<::Error>, -{ /// Initializes the [`EnvelopeBuffer`] given the initialization state from the - /// [`StackProvider`]. + /// [`EnvelopeRepository`]. pub async fn initialize(&mut self) { relay_statsd::metric!(timer(RelayTimers::BufferInitialization), { - let initialization_state = self.stack_provider.initialize().await; - self.load_stacks(initialization_state.project_key_pairs) + let initialization_state = self.envelope_repository.initialize().await; + self.load_project_key_pairs(initialization_state.project_key_pairs) .await; self.load_store_total_count().await; }); } - /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. - /// - /// If the envelope stack does not exist, a new stack is pushed to the priority queue. - /// The priority of the stack is updated with the envelope's received_at time. + /// Pushes an envelope to the [`EnvelopeRepository`] and updates the priority queue accordingly. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - let received_at = envelope.meta().start_time().into(); - let project_key_pair = ProjectKeyPair::from_envelope(&envelope); - if let Some(( - QueueItem { - key: _, - value: stack, - }, - _, - )) = self.priority_queue.get_mut(&project_key_pair) - { - stack.push(envelope).await?; - } else { - // Since we have initialization code that creates all the necessary stacks, we assume - // that any new stack that is added during the envelope buffer's lifecycle, is recreated. - self.push_stack( - StackCreationType::New, - ProjectKeyPair::from_envelope(&envelope), - Some(envelope), - ) - .await?; - } - self.priority_queue - .change_priority_by(&project_key_pair, |prio| { - prio.received_at = received_at; - }); + relay_statsd::metric!(timer(RelayTimers::BufferPush), { + let received_at = envelope.meta().start_time().into(); + let project_key_pair = ProjectKeyPair::from_envelope(&envelope); + + // If we haven't seen this project key pair, we will add it to the priority queue, otherwise + // we just update its priority. + if self.priority_queue.get_mut(&project_key_pair).is_none() { + self.add(project_key_pair, Some(envelope.as_ref())) + } else { + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = received_at; + }); + } - self.total_count.fetch_add(1, AtomicOrdering::SeqCst); - self.track_total_count(); + self.envelope_repository + .push(project_key_pair, envelope) + .await?; + + self.total_count.fetch_add(1, AtomicOrdering::SeqCst); + self.track_total_count(); - Ok(()) + Ok(()) + }) } /// Returns a reference to the next-in-line envelope, if one exists. pub async fn peek(&mut self) -> Result { - let Some(( - QueueItem { - key: stack_key, - value: stack, - }, - Priority { - readiness, - next_project_fetch, - .. - }, - )) = self.priority_queue.peek_mut() - else { - return Ok(Peek::Empty); - }; + relay_statsd::metric!(timer(RelayTimers::BufferPeek), { + let Some((&project_key_pair, priority)) = self.priority_queue.peek() else { + return Ok(Peek::Empty); + }; - let ready = readiness.ready(); + let envelope = self.envelope_repository.peek(project_key_pair).await?; - Ok(match (stack.peek().await?, ready) { - (None, _) => Peek::Empty, - (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(*stack_key, *next_project_fetch, envelope), + Ok(match (envelope, priority.readiness.ready()) { + (None, _) => Peek::Empty, + (Some(envelope), true) => Peek::Ready(envelope), + (Some(envelope), false) => { + Peek::NotReady(project_key_pair, priority.next_project_fetch, envelope) + } + }) }) } /// Returns the next-in-line envelope, if one exists. /// - /// The priority of the envelope's stack is updated with the next envelope's received_at - /// time. If the stack is empty after popping, it is removed from the priority queue. + /// The priority of the [`ProjectKeyPair`] is updated with the next envelope's received_at + /// time. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { - return Ok(None); - }; - let project_key_pair = *key; - let envelope = stack.pop().await.unwrap().expect("found an empty stack"); - - let next_received_at = stack - .peek() - .await? - .map(|next_envelope| next_envelope.meta().start_time().into()); - - match next_received_at { - None => { - relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1); - self.pop_stack(project_key_pair); - } - Some(next_received_at) => { - self.priority_queue - .change_priority_by(&project_key_pair, |prio| { - prio.received_at = next_received_at; - }); + relay_statsd::metric!(timer(RelayTimers::BufferPop), { + let Some((&project_key_pair, _)) = self.priority_queue.peek() else { + return Ok(None); + }; + + // There must be an envelope when popping, since we call `peek` in the statement above + // and no concurrent access is performed in the meanwhile. + let envelope = self + .envelope_repository + .pop(project_key_pair) + .await? + .expect("pop returned no envelope"); + + let next_received_at = self + .envelope_repository + .peek(project_key_pair) + .await? + .map(|next_envelope| next_envelope.meta().start_time().into()); + match next_received_at { + None => { + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopeStacksPopped) += 1); + self.remove(project_key_pair); + } + Some(next_received_at) => { + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = next_received_at; + }); + } } - } - // We are fine with the count going negative, since it represents that more data was popped, - // than it was initially counted, meaning that we had a wrong total count from - // initialization. - self.total_count.fetch_sub(1, AtomicOrdering::SeqCst); - self.track_total_count(); + // We are fine with the count going negative, since it represents that more data was popped, + // than it was initially counted, meaning that we had a wrong total count from + // initialization. + self.total_count.fetch_sub(1, AtomicOrdering::SeqCst); + self.track_total_count(); - Ok(Some(envelope)) + Ok(Some(envelope)) + }) } - /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". + /// Re-prioritizes all [`ProjectKeyPair`]s that involve the given project key by setting it to + /// "ready". /// /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pairs) = self.stacks_by_project.get(project) { + if let Some(project_key_pairs) = self.project_to_pairs.get(project) { for project_key_pair in project_key_pairs { self.priority_queue .change_priority_by(project_key_pair, |stack| { @@ -383,7 +214,7 @@ where changed } - /// Marks a stack as seen. + /// Marks a [`ProjectKeyPair`] as seen. /// /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents @@ -397,47 +228,39 @@ where }); } - /// Returns `true` if the underlying storage has the capacity to store more envelopes. + /// Returns `true` if the underlying storage has the capacity to store more envelopes, false + /// otherwise. pub fn has_capacity(&self) -> bool { - self.stack_provider.has_store_capacity() + self.envelope_repository.has_store_capacity() } /// Flushes the envelope buffer. - pub async fn flush(&mut self) { - let priority_queue = mem::take(&mut self.priority_queue); - self.stack_provider - .flush(priority_queue.into_iter().map(|(q, _)| q.value)) - .await; + /// + /// Returns `true` in case after the flushing it is safe to destroy the buffer, `false` + /// otherwise. This is done because we want to make sure we know whether it is safe to drop the + /// [`EnvelopeBuffer`] after flushing is performed. + pub async fn flush(&mut self) -> bool { + self.envelope_repository.flush().await } - /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. - async fn push_stack( - &mut self, - stack_creation_type: StackCreationType, - project_key_pair: ProjectKeyPair, - envelope: Option>, - ) -> Result<(), EnvelopeBufferError> { + /// Returns `true` if the [`EnvelopeBuffer`] is using an in-memory strategy, false otherwise. + pub fn is_memory(&self) -> bool { + matches!(self.envelope_repository, EnvelopeRepository::Memory(_)) + } + + /// Adds a new [`ProjectKeyPair`] to the `priority_queue` and `project_to_pairs`. + fn add(&mut self, project_key_pair: ProjectKeyPair, envelope: Option<&Envelope>) { let received_at = envelope .as_ref() .map_or(Instant::now(), |e| e.meta().start_time().into()); - let mut stack = self - .stack_provider - .create_stack(stack_creation_type, project_key_pair); - if let Some(envelope) = envelope { - stack.push(envelope).await?; - } - - let previous_entry = self.priority_queue.push( - QueueItem { - key: project_key_pair, - value: stack, - }, - Priority::new(received_at), - ); + let previous_entry = self + .priority_queue + .push(project_key_pair, Priority::new(received_at)); debug_assert!(previous_entry.is_none()); + for project_key in project_key_pair.iter() { - self.stacks_by_project + self.project_to_pairs .entry(project_key) .or_default() .insert(project_key_pair); @@ -445,14 +268,12 @@ where relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); - - Ok(()) } - /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`]. - fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) { + /// Removes a [`ProjectKeyPair`] from the `priority_queue` and `project_to_pairs`. + fn remove(&mut self, project_key_pair: ProjectKeyPair) { for project_key in project_key_pair.iter() { - self.stacks_by_project + self.project_to_pairs .get_mut(&project_key) .expect("project_key is missing from lookup") .remove(&project_key_pair); @@ -464,12 +285,10 @@ where ); } - /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. - async fn load_stacks(&mut self, project_key_pairs: HashSet) { + /// Creates all the priority queue entries given the supplied [`ProjectKeyPair`]s. + async fn load_project_key_pairs(&mut self, project_key_pairs: HashSet) { for project_key_pair in project_key_pairs { - self.push_stack(StackCreationType::Initialization, project_key_pair, None) - .await - .expect("Pushing an empty stack raised an error"); + self.add(project_key_pair, None); } } @@ -479,9 +298,10 @@ where /// will process, besides the count of elements that will be added and removed during its /// lifecycle async fn load_store_total_count(&mut self) { - let total_count = timeout(Duration::from_secs(1), async { - self.stack_provider.store_total_count().await - }) + let total_count = timeout( + Duration::from_secs(1), + self.envelope_repository.store_total_count(), + ) .await; match total_count { Ok(total_count) => { @@ -510,7 +330,7 @@ where relay_statsd::metric!( histogram(RelayHistograms::BufferEnvelopesCount) = total_count, initialized = initialized, - stack_type = self.stack_provider.stack_type() + stack_type = &self.envelope_repository.name() ); } } @@ -522,32 +342,6 @@ pub enum Peek<'a> { NotReady(ProjectKeyPair, Instant, &'a Envelope), } -#[derive(Debug)] -struct QueueItem { - key: K, - value: V, -} - -impl std::borrow::Borrow for QueueItem { - fn borrow(&self) -> &K { - &self.key - } -} - -impl std::hash::Hash for QueueItem { - fn hash(&self, state: &mut H) { - self.key.hash(state); - } -} - -impl PartialEq for QueueItem { - fn eq(&self, other: &Self) -> bool { - self.key == other.key - } -} - -impl Eq for QueueItem {} - #[derive(Debug, Clone)] struct Priority { readiness: Readiness, @@ -681,7 +475,13 @@ mod tests { envelope } - fn mock_config(path: &str) -> Arc { + fn mock_config() -> Arc { + Config::from_json_value(serde_json::json!({})) + .unwrap() + .into() + } + + fn mock_config_with_path(path: &str) -> Arc { Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { @@ -694,10 +494,10 @@ mod tests { } fn mock_memory_checker() -> MemoryChecker { - MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone()) + MemoryChecker::new(MemoryStat::default(), mock_config()) } - async fn peek_project_key(buffer: &mut EnvelopeBuffer) -> ProjectKey { + async fn peek_project_key(buffer: &mut EnvelopeBuffer) -> ProjectKey { buffer .peek() .await @@ -710,7 +510,9 @@ mod tests { #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) + .await + .unwrap(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -781,7 +583,9 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) + .await + .unwrap(); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -808,7 +612,9 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) + .await + .unwrap(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -924,7 +730,9 @@ mod tests { assert_ne!(project_key_pair1, project_key_pair2); - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) + .await + .unwrap(); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -956,7 +764,9 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let mut buffer = EnvelopeBuffer::from_config(&mock_config(), mock_memory_checker()) + .await + .unwrap(); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -1013,9 +823,9 @@ mod tests { .into_os_string() .into_string() .unwrap(); - let config = mock_config(&path); + let config = mock_config_with_path(&path); let mut store = SqliteEnvelopeStore::prepare(&config).await.unwrap(); - let mut buffer = EnvelopeBuffer::::new(&config) + let mut buffer = EnvelopeBuffer::from_config(&config, mock_memory_checker()) .await .unwrap(); @@ -1029,7 +839,7 @@ mod tests { // We assume that the buffer is empty. assert!(buffer.priority_queue.is_empty()); - assert!(buffer.stacks_by_project.is_empty()); + assert!(buffer.project_to_pairs.is_empty()); buffer.initialize().await; @@ -1038,6 +848,6 @@ mod tests { assert_eq!(buffer.priority_queue.len(), 1); // We expect to have an entry per project key, since we have 1 pair, the total entries // should be 2. - assert_eq!(buffer.stacks_by_project.len(), 2); + assert_eq!(buffer.project_to_pairs.len(), 2); } } diff --git a/relay-server/src/services/buffer/envelope_repository/memory.rs b/relay-server/src/services/buffer/envelope_repository/memory.rs new file mode 100644 index 0000000000..58656935ac --- /dev/null +++ b/relay-server/src/services/buffer/envelope_repository/memory.rs @@ -0,0 +1,75 @@ +use crate::services::buffer::common::ProjectKeyPair; +use crate::{Envelope, MemoryChecker}; +use hashbrown::HashMap; +use std::convert::Infallible; + +/// Provides in-memory storage for envelopes, organized by project key pairs. +#[derive(Debug)] +pub struct MemoryEnvelopeRepository { + #[allow(clippy::vec_box)] + envelopes: HashMap>>, + memory_checker: MemoryChecker, +} + +impl MemoryEnvelopeRepository { + /// Creates a new [`MemoryEnvelopeRepository`] with the given memory checker. + pub fn new(memory_checker: MemoryChecker) -> Self { + Self { + envelopes: HashMap::new(), + memory_checker, + } + } + + /// Pushes an envelope to the repository for the given project key pair. + pub async fn push( + &mut self, + project_key_pair: ProjectKeyPair, + envelope: Box, + ) -> Result<(), Infallible> { + self.envelopes + .entry(project_key_pair) + .or_default() + .push(envelope); + + Ok(()) + } + + /// Peeks at the next envelope for the given project key pair without removing it. + pub async fn peek( + &self, + project_key_pair: ProjectKeyPair, + ) -> Result, Infallible> { + Ok(self + .envelopes + .get(&project_key_pair) + .and_then(|envelopes| envelopes.last().map(|boxed| boxed.as_ref()))) + } + + /// Pops and returns the next envelope for the given project key pair. + pub async fn pop( + &mut self, + project_key_pair: ProjectKeyPair, + ) -> Result>, Infallible> { + Ok(self + .envelopes + .get_mut(&project_key_pair) + .and_then(|envelopes| envelopes.pop())) + } + + /// Attempts to flush envelopes to storage. + pub async fn flush(&mut self) -> bool { + // Only if there are no envelopes we can signal to the caller that it is safe to drop the + // buffer. + self.envelopes.is_empty() + } + + /// Checks if there is capacity to store more envelopes. + pub fn has_store_capacity(&self) -> bool { + self.memory_checker.check_memory().has_capacity() + } + + /// Retrieves the total count of envelopes in the store. + pub fn store_total_count(&self) -> u64 { + self.envelopes.values().map(|e| e.len() as u64).sum() + } +} diff --git a/relay-server/src/services/buffer/envelope_repository/mod.rs b/relay-server/src/services/buffer/envelope_repository/mod.rs new file mode 100644 index 0000000000..c1e2171e95 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_repository/mod.rs @@ -0,0 +1,136 @@ +use crate::services::buffer::common::{EnvelopeBufferError, ProjectKeyPair}; +use crate::services::buffer::envelope_repository::memory::MemoryEnvelopeRepository; +use crate::services::buffer::envelope_repository::sqlite::SqliteEnvelopeRepository; +use crate::{Envelope, MemoryChecker}; +use hashbrown::HashSet; +use relay_config::Config; + +mod memory; +pub mod sqlite; + +/// State of the initialization of the [`EnvelopeRepository`]. +/// +/// This state is necessary for initializing resources whenever a [`EnvelopeRepository`] is used. +#[derive(Debug)] +pub struct InitializationState { + pub project_key_pairs: HashSet, +} + +impl InitializationState { + /// Create a new [`InitializationState`]. + pub fn new(project_key_pairs: HashSet) -> Self { + Self { project_key_pairs } + } + + /// Creates a new empty [`InitializationState`]. + pub fn empty() -> Self { + Self { + project_key_pairs: HashSet::new(), + } + } +} + +/// Represents different types of envelope repositories. +#[derive(Debug)] +pub enum EnvelopeRepository { + /// In-memory envelope repository. + Memory(MemoryEnvelopeRepository), + /// SQLite-based envelope repository. + SQLite(SqliteEnvelopeRepository), +} + +impl EnvelopeRepository { + /// Creates a new memory-based envelope repository. + pub fn memory(memory_checker: MemoryChecker) -> Result { + Ok(Self::Memory(MemoryEnvelopeRepository::new(memory_checker))) + } + + /// Creates a new SQLite-based envelope repository. + pub async fn sqlite(config: &Config) -> Result { + Ok(Self::SQLite(SqliteEnvelopeRepository::new(config).await?)) + } + + /// Initializes the [`EnvelopeRepository`] and returns the initialization state. + pub async fn initialize(&mut self) -> InitializationState { + match self { + EnvelopeRepository::Memory(_) => InitializationState::empty(), + EnvelopeRepository::SQLite(repository) => repository.initialize().await, + } + } + + /// Pushes an envelope to the repository for the given project key pair. + pub async fn push( + &mut self, + project_key_pair: ProjectKeyPair, + envelope: Box, + ) -> Result<(), EnvelopeBufferError> { + match self { + EnvelopeRepository::Memory(repository) => { + repository.push(project_key_pair, envelope).await? + } + EnvelopeRepository::SQLite(repository) => { + repository.push(project_key_pair, envelope).await? + } + } + + Ok(()) + } + + /// Peeks at the next envelope for the given project key pair without removing it. + pub async fn peek( + &mut self, + project_key_pair: ProjectKeyPair, + ) -> Result, EnvelopeBufferError> { + let envelope = match self { + EnvelopeRepository::Memory(repository) => repository.peek(project_key_pair).await?, + EnvelopeRepository::SQLite(repository) => repository.peek(project_key_pair).await?, + }; + + Ok(envelope) + } + + /// Pops and returns the next envelope for the given project key pair. + pub async fn pop( + &mut self, + project_key_pair: ProjectKeyPair, + ) -> Result>, EnvelopeBufferError> { + let envelope = match self { + EnvelopeRepository::Memory(repository) => repository.pop(project_key_pair).await?, + EnvelopeRepository::SQLite(repository) => repository.pop(project_key_pair).await?, + }; + + Ok(envelope) + } + + /// Flushes the [`Envelope`]s in the [`EnvelopeRepository`]. + pub async fn flush(&mut self) -> bool { + match self { + EnvelopeRepository::Memory(repository) => repository.flush().await, + EnvelopeRepository::SQLite(repository) => repository.flush().await, + } + } + + /// Returns `true` when there is space to store more [`Envelope`]s, `false` otherwise. + pub fn has_store_capacity(&self) -> bool { + match self { + EnvelopeRepository::Memory(repository) => repository.has_store_capacity(), + EnvelopeRepository::SQLite(repository) => repository.has_store_capacity(), + } + } + + /// Returns the total count of [`Envelope`]s in the store. + pub async fn store_total_count(&self) -> u64 { + match self { + EnvelopeRepository::Memory(repository) => repository.store_total_count(), + EnvelopeRepository::SQLite(repository) => repository.store_total_count().await, + } + } + + /// Returns the string representation of the [`EnvelopeRepository`]'s strategy. + pub fn name(&self) -> &'static str { + match self { + EnvelopeRepository::Memory(_) => "memory", + EnvelopeRepository::SQLite(_) => "sqlite", + } + } +} diff --git a/relay-server/src/services/buffer/envelope_repository/sqlite.rs b/relay-server/src/services/buffer/envelope_repository/sqlite.rs new file mode 100644 index 0000000000..61b0795349 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_repository/sqlite.rs @@ -0,0 +1,681 @@ +use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::envelope_repository::InitializationState; +use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; +use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; +use crate::{Envelope, SqliteEnvelopeStore}; +use hashbrown::{HashMap, HashSet}; +use relay_config::Config; +use std::error::Error; + +/// An error returned when doing an operation on [`SqliteEnvelopeRepository`]. +#[derive(Debug, thiserror::Error)] +pub enum SqliteEnvelopeRepositoryError { + /// Represents an error that occurred in the envelope store. + #[error("an error occurred in the envelope store: {0}")] + EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), +} + +#[derive(Debug, Default)] +struct EnvelopeStack { + #[allow(clippy::vec_box)] + cached_envelopes: Vec>, + check_disk: bool, +} + +/// A repository for storing and managing envelopes using SQLite as a backend. +/// +/// This struct manages both in-memory and on-disk storage of envelopes, +/// implementing spooling and unspooling mechanisms to balance between +/// memory usage and disk I/O. +#[derive(Debug)] +pub struct SqliteEnvelopeRepository { + envelope_stacks: HashMap, + envelope_store: SqliteEnvelopeStore, + cached_envelopes_size: u64, + disk_batch_size: usize, + max_disk_size: usize, +} + +impl SqliteEnvelopeRepository { + /// Creates a new [`SqliteEnvelopeRepository`] instance. + pub async fn new(config: &Config) -> Result { + let envelope_store = SqliteEnvelopeStore::prepare(config).await?; + Ok(Self { + envelope_stacks: HashMap::new(), + envelope_store, + cached_envelopes_size: 0, + disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), + max_disk_size: config.spool_envelopes_max_disk_size(), + }) + } + + /// Creates a new [`SqliteEnvelopeRepository`] instance given a [`SqliteEnvelopeStore`]. + pub fn new_with_store(config: &Config, envelope_store: SqliteEnvelopeStore) -> Self { + Self { + envelope_stacks: HashMap::new(), + envelope_store, + cached_envelopes_size: 0, + disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), + max_disk_size: config.spool_envelopes_max_disk_size(), + } + } + + /// Initializes the envelope repository. + /// + /// Retrieves the project key pairs from the envelope store and creates + /// an initialization state. + pub async fn initialize(&mut self) -> InitializationState { + match self.envelope_store.project_key_pairs().await { + Ok(project_key_pairs) => { + self.initialize_empty_stacks(&project_key_pairs); + InitializationState::new(project_key_pairs) + } + Err(error) => { + relay_log::error!( + error = &error as &dyn Error, + "failed to initialize the sqlite stack repository" + ); + InitializationState::empty() + } + } + } + + /// Pushes an envelope to the repository for the given project key pair. + /// + /// If the spool threshold is exceeded, it may trigger spooling to disk. + pub async fn push( + &mut self, + project_key_pair: ProjectKeyPair, + envelope: Box, + ) -> Result<(), SqliteEnvelopeRepositoryError> { + if self.above_spool_threshold() { + self.spool_to_disk().await?; + } + + self.envelope_stacks + .entry(project_key_pair) + .or_default() + .cached_envelopes + .push(envelope); + + self.cached_envelopes_size += 1; + + Ok(()) + } + + /// Peeks at the next envelope for the given project key pair without removing it. + /// + /// If no envelope is in the buffer, it will be loaded from disk and a reference will be + /// returned. + pub async fn peek( + &mut self, + project_key_pair: ProjectKeyPair, + ) -> Result, SqliteEnvelopeRepositoryError> { + // If we have no data for the project key pair, we can safely assume we don't have envelopes + // for this pair anywhere. + let Some(envelope_stack) = self.envelope_stacks.get(&project_key_pair) else { + return Ok(None); + }; + + if envelope_stack.cached_envelopes.is_empty() && envelope_stack.check_disk { + let envelopes = self.unspool_from_disk(project_key_pair, 1).await?; + // If we have no envelopes in the buffer and no on disk, we can be safe removing the entry + // in the buffer. + if envelopes.is_empty() { + self.envelope_stacks.remove(&project_key_pair); + return Ok(None); + } + + self.cached_envelopes_size += envelopes.len() as u64; + self.envelope_stacks + .entry(project_key_pair) + .or_default() + .cached_envelopes + .extend(envelopes); + } + + Ok(self + .envelope_stacks + .get(&project_key_pair) + .and_then(|e| e.cached_envelopes.last().map(Box::as_ref))) + } + + /// Pops and returns the next envelope for the given project key pair. + /// + /// If no envelope is in the buffer, it will be loaded from disk. + pub async fn pop( + &mut self, + project_key_pair: ProjectKeyPair, + ) -> Result>, SqliteEnvelopeRepositoryError> { + let envelope = self + .envelope_stacks + .get_mut(&project_key_pair) + .and_then(|envelopes| envelopes.cached_envelopes.pop()); + if let Some(envelope) = envelope { + // We only decrement the counter when removing data from the in memory buffer. + self.cached_envelopes_size -= 1; + return Ok(Some(envelope)); + } + + // If we don't need to check disk, we assume there are no envelopes, so we early return + // `None`. + if !self.should_check_disk(project_key_pair) { + return Ok(None); + } + + // If we have no envelopes in the buffer, we try to pop and immediately return data from + // disk. + let mut envelopes = self.unspool_from_disk(project_key_pair, 1).await?; + // If we have no envelopes in the buffer and no on disk, we can be safe removing the entry + // in the buffer. + if envelopes.is_empty() { + self.envelope_stacks.remove(&project_key_pair); + } + + Ok(envelopes.pop()) + } + + /// Flushes all remaining envelopes to disk. + pub async fn flush(&mut self) -> bool { + relay_statsd::metric!(timer(RelayTimers::BufferFlush), { + let envelope_store = self.envelope_store.clone(); + for batch in self.get_envelope_batches() { + if let Err(error) = Self::insert_envelope_batch(envelope_store.clone(), batch).await + { + relay_log::error!( + error = &error as &dyn Error, + "failed to flush envelopes, some might be lost", + ); + } + } + }); + + true + } + + /// Checks if there's capacity in the store for more envelopes. + pub fn has_store_capacity(&self) -> bool { + (self.envelope_store.usage() as usize) < self.max_disk_size + } + + /// Retrieves the total count of envelopes in the store. + pub async fn store_total_count(&self) -> u64 { + self.envelope_store + .total_count() + .await + .unwrap_or_else(|error| { + relay_log::error!( + error = &error as &dyn Error, + "failed to get the total count of envelopes for the sqlite envelope store", + ); + // In case we have an error, we default to communicating a total count of 0. + 0 + }) + } + + /// Initializes a set of empty [`EnvelopeStack`]s. + fn initialize_empty_stacks(&mut self, project_key_pairs: &HashSet) { + for &project_key_pair in project_key_pairs { + let envelope_stack = self.envelope_stacks.entry(project_key_pair).or_default(); + // When creating an envelope stack during initialization, we assume data is on disk. + envelope_stack.check_disk = true; + } + } + + /// Determines if the number of buffered envelopes is above the spool threshold. + fn above_spool_threshold(&self) -> bool { + self.cached_envelopes_size >= self.disk_batch_size as u64 + } + + /// Spools all buffered envelopes to disk. + async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeRepositoryError> { + let envelope_store = self.envelope_store.clone(); + let mut processed_batches = 0; + for batch in self.get_envelope_batches() { + Self::insert_envelope_batch(envelope_store.clone(), batch).await?; + processed_batches += 1; + } + // We should have only one batch here, since we spool when we reach the batch size. + debug_assert!(processed_batches == 1); + + Ok(()) + } + + /// Unspools from disk up to `n` envelopes and returns them. + async fn unspool_from_disk( + &mut self, + project_key_pair: ProjectKeyPair, + n: u64, + ) -> Result>, SqliteEnvelopeRepositoryError> { + let envelopes = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), { + self.envelope_store + .delete_many( + project_key_pair.own_key, + project_key_pair.sampling_key, + n as i64, + ) + .await + .map_err(SqliteEnvelopeRepositoryError::EnvelopeStoreError)? + }); + + if envelopes.is_empty() { + // In case no envelopes were unspooled, we mark this project key pair as having no + // envelopes on disk. + self.set_check_disk(project_key_pair, false); + + return Ok(vec![]); + } + + relay_statsd::metric!( + counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64 + ); + + Ok(envelopes) + } + + /// Returns `true` whether the disk should be checked for data for a given [`ProjectKeyPair`], + /// false otherwise. + fn should_check_disk(&self, project_key_pair: ProjectKeyPair) -> bool { + // If a project key pair is unknown, we don't want to check disk. + self.envelope_stacks + .get(&project_key_pair) + .map_or(false, |e| e.check_disk) + } + + /// Sets on the [`EnvelopeStack`] whether the disk should be checked or not. + fn set_check_disk(&mut self, project_key_pair: ProjectKeyPair, check_disk: bool) { + if let Some(envelope_stack) = self.envelope_stacks.get_mut(&project_key_pair) { + envelope_stack.check_disk = check_disk; + } + } + + /// Returns batches of envelopes of size `self.disk_batch_size`. + #[allow(clippy::vec_box)] + fn get_envelope_batches(&mut self) -> impl Iterator>> + '_ { + // Create a flat iterator over all the envelopes + let envelope_iter = self.envelope_stacks.values_mut().flat_map(|e| { + e.check_disk = true; + self.cached_envelopes_size -= e.cached_envelopes.len() as u64; + relay_statsd::metric!( + histogram(RelayHistograms::BufferInMemoryEnvelopesPerKeyPair) = + e.cached_envelopes.len() as u64 + ); + e.cached_envelopes.drain(..) + }); + + // Wrap this flat iterator with a custom chunking logic + ChunkedIterator { + inner: envelope_iter, + chunk_size: self.disk_batch_size, + } + } + + /// Inserts a batch of envelopes into the envelope store. + #[allow(clippy::vec_box)] + async fn insert_envelope_batch( + mut envelope_store: SqliteEnvelopeStore, + batch: Vec>, + ) -> Result<(), SqliteEnvelopeRepositoryError> { + if batch.is_empty() { + return Ok(()); + } + + relay_statsd::metric!(counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64); + + // Convert envelopes into a format which simplifies insertion in the store. + let envelopes = batch.iter().filter_map(|e| e.as_ref().try_into().ok()); + + relay_statsd::metric!(timer(RelayTimers::BufferSpool), { + envelope_store + .insert_many(envelopes) + .await + .map_err(SqliteEnvelopeRepositoryError::EnvelopeStoreError)?; + }); + + Ok(()) + } +} + +struct ChunkedIterator +where + I: Iterator>, +{ + inner: I, + chunk_size: usize, +} + +impl Iterator for ChunkedIterator +where + I: Iterator>, +{ + type Item = Vec>; + + fn next(&mut self) -> Option { + let mut batch = Vec::with_capacity(self.chunk_size); + + // Fill up the batch with up to `chunk_size` envelopes + for _ in 0..self.chunk_size { + if let Some(envelope) = self.inner.next() { + batch.push(envelope); + } else { + break; // Stop when there are no more items + } + } + + // Return `None` if no more batches are available + if batch.is_empty() { + None + } else { + Some(batch) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::services::buffer::testutils::utils::{ + mock_envelope, mock_envelopes, mock_envelopes_for_project, setup_db, + }; + use relay_base_schema::project::ProjectKey; + use std::time::{Duration, Instant}; + + async fn setup_repository( + run_migrations: bool, + disk_batch_size: usize, + max_disk_size: usize, + ) -> SqliteEnvelopeRepository { + let db = setup_db(run_migrations).await; + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + + SqliteEnvelopeRepository { + envelope_stacks: HashMap::new(), + envelope_store, + cached_envelopes_size: 0, + disk_batch_size, + max_disk_size, + } + } + + #[tokio::test] + async fn test_initialize_with_unmigrated_db() { + let mut repository = setup_repository(false, 2, 0).await; + + let initialization_state = repository.initialize().await; + assert!(initialization_state.project_key_pairs.is_empty()); + } + + #[tokio::test] + async fn test_push_with_unmigrated_db() { + let mut repository = setup_repository(false, 1, 0).await; + + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelope_1 = mock_envelope(Instant::now(), Some(project_key_pair.sampling_key)); + let envelope_2 = mock_envelope(Instant::now(), Some(project_key_pair.sampling_key)); + + // Push should succeed as it doesn't interact with the database initially + assert!(repository.push(project_key_pair, envelope_1).await.is_ok()); + + // Push should fail because after the second insertion we try to spool + let result = repository.push(project_key_pair, envelope_2).await; + assert!(result.is_err()); + if let Err(error) = result { + assert!(matches!( + error, + SqliteEnvelopeRepositoryError::EnvelopeStoreError(_) + )); + } + } + + #[tokio::test] + async fn test_pop_with_unmigrated_db() { + let mut repository = setup_repository(false, 1, 0).await; + + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + // We initialize empty stacks to make sure the repository checks for disk + let mut project_key_pairs = HashSet::new(); + project_key_pairs.insert(project_key_pair); + repository.initialize_empty_stacks(&project_key_pairs); + + // Pop should fail because we can't unspool data from disk + let result = repository.pop(project_key_pair).await; + assert!(result.is_err()); + if let Err(error) = result { + assert!(matches!( + error, + SqliteEnvelopeRepositoryError::EnvelopeStoreError(_) + )); + } + } + + #[tokio::test] + async fn test_push_and_pop() { + let mut repository = setup_repository(true, 2, 0).await; + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(5); + + // Push 5 envelopes + for envelope in envelopes.clone() { + assert!(repository.push(project_key_pair, envelope).await.is_ok()); + } + + // Pop 5 envelopes + for envelope in envelopes.iter().rev() { + let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + + // Ensure the repository is empty + assert!(repository.pop(project_key_pair).await.unwrap().is_none()); + assert!(!repository.envelope_stacks.contains_key(&project_key_pair)); + } + + #[tokio::test] + async fn test_peek() { + let mut repository = setup_repository(true, 2, 0).await; + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelope = mock_envelope(Instant::now(), None); + repository + .push(project_key_pair, envelope.clone()) + .await + .unwrap(); + + // Peek at the envelope + let peeked_envelope = repository.peek(project_key_pair).await.unwrap().unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + + // Ensure the envelope is still there after peeking + let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + + #[tokio::test] + async fn test_spool_and_unspool_disk() { + let mut repository = setup_repository(true, 5, 0).await; + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(15); + + // Push 15 envelopes (should trigger spooling after 5) + for envelope in envelopes.clone() { + assert!(repository.push(project_key_pair, envelope).await.is_ok()); + } + + // Check that we have 5 envelopes in memory (1 batch of 3) + assert_eq!(repository.cached_envelopes_size, 5); + assert_eq!(repository.store_total_count().await, 10); + + // Pop all envelopes + for envelope in envelopes.iter().rev() { + let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap(), + ); + } + + // Ensure the repository is now empty + assert!(repository.pop(project_key_pair).await.unwrap().is_none()); + assert_eq!(repository.cached_envelopes_size, 0); + assert_eq!(repository.store_total_count().await, 0); + } + + #[tokio::test] + async fn test_flush() { + let mut repository = setup_repository(true, 2, 1000).await; + let project_key_pair = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(5); + + // Push 5 envelopes + for envelope in envelopes.clone() { + assert!(repository.push(project_key_pair, envelope).await.is_ok()); + } + + // Flush all envelopes to disk + assert!(repository.flush().await); + + // Check that all envelopes are now on disk + assert_eq!(repository.store_total_count().await, 5); + + // Pop all envelopes (should trigger unspool from disk) + for envelope in envelopes.iter().rev() { + let popped_envelope = repository.pop(project_key_pair).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + + // Ensure the repository is empty + assert!(repository.pop(project_key_pair).await.unwrap().is_none()); + assert_eq!(repository.store_total_count().await, 0); + } + + #[tokio::test] + async fn test_multiple_project_key_pairs() { + let mut repository = setup_repository(true, 2, 1000).await; + let project_key_pair1 = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b28ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ); + let project_key_pair2 = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("c67ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ); + + let envelopes1 = mock_envelopes_for_project(3, project_key_pair1.sampling_key); + let envelopes2 = mock_envelopes_for_project(2, project_key_pair2.sampling_key); + + // Push envelopes for both project key pairs + for envelope in envelopes1.clone() { + assert!(repository.push(project_key_pair1, envelope).await.is_ok()); + } + for envelope in envelopes2.clone() { + assert!(repository.push(project_key_pair2, envelope).await.is_ok()); + } + + // Pop envelopes for project_key_pair1 + for envelope in envelopes1.iter().rev() { + let popped_envelope = repository.pop(project_key_pair1).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + + // Pop envelopes for project_key_pair2 + for envelope in envelopes2.iter().rev() { + let popped_envelope = repository.pop(project_key_pair2).await.unwrap().unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + + // Ensure both project key pairs are empty + assert!(repository.pop(project_key_pair1).await.unwrap().is_none()); + assert!(repository.pop(project_key_pair2).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_check_disk() { + let mut repository = setup_repository(true, 2, 0).await; + let project_key_pair1 = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b28ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ); + let project_key_pair2 = ProjectKeyPair::new( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("c67ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ); + + // Push 3 envelopes for project_key_pair1 (should trigger spooling) + let envelopes1 = mock_envelopes_for_project(3, project_key_pair1.sampling_key); + for envelope in envelopes1 { + assert!(repository.push(project_key_pair1, envelope).await.is_ok()); + } + + // Since we spool, we expect to be able to check disk for project_key_pair1 + for (&project_key_pair, envelope_stack) in repository.envelope_stacks.iter() { + assert_eq!( + envelope_stack.check_disk, + project_key_pair == project_key_pair1 + ); + } + + // Pop all envelopes for project_key_pair1 + while repository.pop(project_key_pair1).await.unwrap().is_some() {} + assert_eq!(repository.store_total_count().await, 0); + + // Push 1 envelope for project_key_pair2 (should not trigger spooling) + let envelope = mock_envelope(Instant::now(), Some(project_key_pair2.sampling_key)); + assert!(repository.push(project_key_pair2, envelope).await.is_ok()); + + // Flush remaining envelopes to disk + assert!(repository.flush().await); + + // After flushing, we expect to be able to check disk for project_key_pair2 + for (&project_key_pair, envelope_stack) in repository.envelope_stacks.iter() { + assert_eq!( + envelope_stack.check_disk, + project_key_pair == project_key_pair2 + ); + } + + // Pop all envelopes for project_key_pair1 + while repository.pop(project_key_pair2).await.unwrap().is_some() {} + assert_eq!(repository.store_total_count().await, 0); + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs deleted file mode 100644 index ceb771ec95..0000000000 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::convert::Infallible; - -use crate::Envelope; - -use super::EnvelopeStack; - -#[derive(Debug)] -pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); - -impl MemoryEnvelopeStack { - pub fn new() -> Self { - Self(vec![]) - } -} - -impl EnvelopeStack for MemoryEnvelopeStack { - type Error = Infallible; - - async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - self.0.push(envelope); - Ok(()) - } - - async fn peek(&mut self) -> Result, Self::Error> { - Ok(self.0.last().map(Box::as_ref)) - } - - async fn pop(&mut self) -> Result>, Self::Error> { - Ok(self.0.pop()) - } - - fn flush(self) -> Vec> { - self.0 - } -} diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs deleted file mode 100644 index 8acfff0600..0000000000 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::future::Future; - -use crate::envelope::Envelope; - -pub mod memory; -pub mod sqlite; - -/// A stack-like data structure that holds [`Envelope`]s. -pub trait EnvelopeStack: Send + std::fmt::Debug { - /// The error type that is returned when an error is encountered during reading or writing the - /// [`EnvelopeStack`]. - type Error: std::fmt::Debug; - - /// Pushes an [`Envelope`] on top of the stack. - fn push(&mut self, envelope: Box) -> impl Future>; - - /// Peeks the [`Envelope`] on top of the stack. - fn peek(&mut self) -> impl Future, Self::Error>>; - - /// Pops the [`Envelope`] on top of the stack. - fn pop(&mut self) -> impl Future>, Self::Error>>; - - /// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible, - /// and consumes the stack provider. - fn flush(self) -> Vec>; -} diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs deleted file mode 100644 index 703569ddab..0000000000 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ /dev/null @@ -1,496 +0,0 @@ -use std::collections::VecDeque; -use std::fmt::Debug; -use std::num::NonZeroUsize; - -use relay_base_schema::project::ProjectKey; - -use crate::envelope::Envelope; -use crate::services::buffer::envelope_stack::EnvelopeStack; -use crate::services::buffer::envelope_store::sqlite::{ - SqliteEnvelopeStore, SqliteEnvelopeStoreError, -}; -use crate::statsd::{RelayCounters, RelayTimers}; - -/// An error returned when doing an operation on [`SqliteEnvelopeStack`]. -#[derive(Debug, thiserror::Error)] -pub enum SqliteEnvelopeStackError { - #[error("an error occurred in the envelope store: {0}")] - EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), -} - -#[derive(Debug)] -/// An [`EnvelopeStack`] that is implemented on an SQLite database. -/// -/// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled -/// to disk in a batched way. -pub struct SqliteEnvelopeStack { - /// Shared SQLite database pool which will be used to read and write from disk. - envelope_store: SqliteEnvelopeStore, - /// Threshold defining the maximum number of envelopes in the `batches_buffer` before spooling - /// to disk will take place. - spool_threshold: NonZeroUsize, - /// Size of a batch of envelopes that is written to disk. - batch_size: NonZeroUsize, - /// The project key of the project to which all the envelopes belong. - own_key: ProjectKey, - /// The project key of the root project of the trace to which all the envelopes belong. - sampling_key: ProjectKey, - /// In-memory stack containing all the batches of envelopes that either have not been written to disk yet, or have been read from disk recently. - #[allow(clippy::vec_box)] - batches_buffer: VecDeque>>, - /// The total number of envelopes inside the `batches_buffer`. - batches_buffer_size: usize, - /// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough - /// elements are available in the `batches_buffer`. - check_disk: bool, -} - -impl SqliteEnvelopeStack { - /// Creates a new empty [`SqliteEnvelopeStack`]. - pub fn new( - envelope_store: SqliteEnvelopeStore, - disk_batch_size: usize, - max_batches: usize, - own_key: ProjectKey, - sampling_key: ProjectKey, - check_disk: bool, - ) -> Self { - Self { - envelope_store, - spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches) - .expect("the spool threshold must be > 0"), - batch_size: NonZeroUsize::new(disk_batch_size) - .expect("the disk batch size must be > 0"), - own_key, - sampling_key, - batches_buffer: VecDeque::with_capacity(max_batches), - batches_buffer_size: 0, - check_disk, - } - } - - /// Threshold above which the [`SqliteEnvelopeStack`] will spool data from the `buffer` to disk. - fn above_spool_threshold(&self) -> bool { - self.batches_buffer_size >= self.spool_threshold.get() - } - - /// Threshold below which the [`SqliteEnvelopeStack`] will unspool data from disk to the - /// `buffer`. - fn below_unspool_threshold(&self) -> bool { - self.batches_buffer_size == 0 - } - - /// Spools to disk up to `disk_batch_size` envelopes from the `buffer`. - /// - /// In case there is a failure while writing envelopes, all the envelopes that were enqueued - /// to be written to disk are lost. The explanation for this behavior can be found in the body - /// of the method. - async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { - let Some(envelopes) = self.batches_buffer.pop_front() else { - return Ok(()); - }; - self.batches_buffer_size -= envelopes.len(); - - relay_statsd::metric!( - counter(RelayCounters::BufferSpooledEnvelopes) += envelopes.len() as u64 - ); - - // We convert envelopes into a format which simplifies insertion in the store. If an - // envelope can't be serialized, we will not insert it. - let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok()); - - // When early return here, we are acknowledging that the elements that we popped from - // the buffer are lost in case of failure. We are doing this on purposes, since if we were - // to have a database corruption during runtime, and we were to put the values back into - // the buffer we will end up with an infinite cycle. - relay_statsd::metric!(timer(RelayTimers::BufferSpool), { - self.envelope_store - .insert_many(envelopes) - .await - .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; - }); - - // If we successfully spooled to disk, we know that data should be there. - self.check_disk = true; - - Ok(()) - } - - /// Unspools from disk up to `disk_batch_size` envelopes and appends them to the `buffer`. - /// - /// In case a single deletion fails, the affected envelope will not be unspooled and unspooling - /// will continue with the remaining envelopes. - /// - /// In case an envelope fails deserialization due to malformed data in the database, the affected - /// envelope will not be unspooled and unspooling will continue with the remaining envelopes. - async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { - let envelopes = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), { - self.envelope_store - .delete_many( - self.own_key, - self.sampling_key, - self.batch_size.get() as i64, - ) - .await - .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? - }); - - if envelopes.is_empty() { - // In case no envelopes were unspooled, we will mark the disk as empty until another - // round of spooling takes place. - self.check_disk = false; - - return Ok(()); - } - - relay_statsd::metric!( - counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64 - ); - - // We push in the back of the buffer, since we still want to give priority to - // incoming envelopes that have a more recent timestamp. - self.batches_buffer_size += envelopes.len(); - self.batches_buffer.push_front(envelopes); - - Ok(()) - } - - /// Validates that the incoming [`Envelope`] has the same project keys at the - /// [`SqliteEnvelopeStack`]. - fn validate_envelope(&self, envelope: &Envelope) -> bool { - let own_key = envelope.meta().public_key(); - let sampling_key = envelope.sampling_key().unwrap_or(own_key); - - self.own_key == own_key && self.sampling_key == sampling_key - } -} - -impl EnvelopeStack for SqliteEnvelopeStack { - type Error = SqliteEnvelopeStackError; - - async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - debug_assert!(self.validate_envelope(&envelope)); - - if self.above_spool_threshold() { - self.spool_to_disk().await?; - } - - // We need to check if the topmost batch has space, if not we have to create a new batch and - // push it in front. - if let Some(last_batch) = self - .batches_buffer - .back_mut() - .filter(|last_batch| last_batch.len() < self.batch_size.get()) - { - last_batch.push(envelope); - } else { - let mut new_batch = Vec::with_capacity(self.batch_size.get()); - new_batch.push(envelope); - self.batches_buffer.push_back(new_batch); - } - - self.batches_buffer_size += 1; - - Ok(()) - } - - async fn peek(&mut self) -> Result, Self::Error> { - if self.below_unspool_threshold() && self.check_disk { - self.unspool_from_disk().await? - } - - let last = self - .batches_buffer - .back() - .and_then(|last_batch| last_batch.last()) - .map(|last_batch| last_batch.as_ref()); - - Ok(last) - } - - async fn pop(&mut self) -> Result>, Self::Error> { - if self.below_unspool_threshold() && self.check_disk { - relay_log::trace!("Unspool from disk"); - self.unspool_from_disk().await? - } - - let result = self.batches_buffer.back_mut().and_then(|last_batch| { - self.batches_buffer_size -= 1; - relay_log::trace!("Popping from memory"); - last_batch.pop() - }); - if result.is_none() { - return Ok(None); - } - - // Since we might leave a batch without elements, we want to pop it from the buffer. - if self - .batches_buffer - .back() - .map_or(false, |last_batch| last_batch.is_empty()) - { - self.batches_buffer.pop_back(); - } - - Ok(result) - } - - fn flush(self) -> Vec> { - self.batches_buffer.into_iter().flatten().collect() - } -} - -#[cfg(test)] -mod tests { - use std::time::{Duration, Instant}; - - use relay_base_schema::project::ProjectKey; - - use super::*; - use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; - - #[tokio::test] - #[should_panic] - async fn test_push_with_mismatching_project_keys() { - let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 2, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - let envelope = mock_envelope(Instant::now()); - let _ = stack.push(envelope).await; - } - - #[tokio::test] - async fn test_push_when_db_is_not_valid() { - let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 2, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - let envelopes = mock_envelopes(4); - - // We push the 4 envelopes without errors because they are below the threshold. - for envelope in envelopes.clone() { - assert!(stack.push(envelope).await.is_ok()); - } - - // We push 1 more envelope which results in spooling, which fails because of a database - // problem. - let envelope = mock_envelope(Instant::now()); - assert!(matches!( - stack.push(envelope).await, - Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) - )); - - // The stack now contains the last of the 3 elements that were added. If we add a new one - // we will end up with 2. - let envelope = mock_envelope(Instant::now()); - assert!(stack.push(envelope.clone()).await.is_ok()); - assert_eq!(stack.batches_buffer_size, 3); - - // We pop the remaining elements, expecting the last added envelope to be on top. - let popped_envelope_1 = stack.pop().await.unwrap().unwrap(); - let popped_envelope_2 = stack.pop().await.unwrap().unwrap(); - let popped_envelope_3 = stack.pop().await.unwrap().unwrap(); - assert_eq!( - popped_envelope_1.event_id().unwrap(), - envelope.event_id().unwrap() - ); - assert_eq!( - popped_envelope_2.event_id().unwrap(), - envelopes.clone()[3].event_id().unwrap() - ); - assert_eq!( - popped_envelope_3.event_id().unwrap(), - envelopes.clone()[2].event_id().unwrap() - ); - assert_eq!(stack.batches_buffer_size, 0); - } - - #[tokio::test] - async fn test_pop_when_db_is_not_valid() { - let db = setup_db(false).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 2, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - // We pop with an invalid db. - assert!(matches!( - stack.pop().await, - Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) - )); - } - - #[tokio::test] - async fn test_pop_when_stack_is_empty() { - let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 2, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - // We pop with no elements. - // We pop with no elements. - assert!(stack.pop().await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_push_below_threshold_and_pop() { - let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 5, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - let envelopes = mock_envelopes(5); - - // We push 5 envelopes. - for envelope in envelopes.clone() { - assert!(stack.push(envelope).await.is_ok()); - } - assert_eq!(stack.batches_buffer_size, 5); - - // We peek the top element. - let peeked_envelope = stack.peek().await.unwrap().unwrap(); - assert_eq!( - peeked_envelope.event_id().unwrap(), - envelopes.clone()[4].event_id().unwrap() - ); - - // We pop 5 envelopes. - for envelope in envelopes.iter().rev() { - let popped_envelope = stack.pop().await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - } - - #[tokio::test] - async fn test_push_above_threshold_and_pop() { - let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store, - 5, - 2, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - let envelopes = mock_envelopes(15); - - // We push 15 envelopes. - for envelope in envelopes.clone() { - assert!(stack.push(envelope).await.is_ok()); - } - assert_eq!(stack.batches_buffer_size, 10); - - // We peek the top element. - let peeked_envelope = stack.peek().await.unwrap().unwrap(); - assert_eq!( - peeked_envelope.event_id().unwrap(), - envelopes.clone()[14].event_id().unwrap() - ); - - // We pop 10 envelopes, and we expect that the last 10 are in memory, since the first 5 - // should have been spooled to disk. - for envelope in envelopes[5..15].iter().rev() { - let popped_envelope = stack.pop().await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - assert_eq!(stack.batches_buffer_size, 0); - - // We peek the top element, which since the buffer is empty should result in a disk load. - let peeked_envelope = stack.peek().await.unwrap().unwrap(); - assert_eq!( - peeked_envelope.event_id().unwrap(), - envelopes.clone()[4].event_id().unwrap() - ); - - // We insert a new envelope, to test the load from disk happening during `peek()` gives - // priority to this envelope in the stack. - let envelope = mock_envelope(Instant::now()); - assert!(stack.push(envelope.clone()).await.is_ok()); - - // We pop and expect the newly inserted element. - let popped_envelope = stack.pop().await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - - // We pop 5 envelopes, which should not result in a disk load since `peek()` already should - // have caused it. - for envelope in envelopes[0..5].iter().rev() { - let popped_envelope = stack.pop().await.unwrap().unwrap(); - assert_eq!( - popped_envelope.event_id().unwrap(), - envelope.event_id().unwrap() - ); - } - assert_eq!(stack.batches_buffer_size, 0); - } - - #[tokio::test] - async fn test_drain() { - let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); - let mut stack = SqliteEnvelopeStack::new( - envelope_store.clone(), - 5, - 1, - ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - true, - ); - - let envelopes = mock_envelopes(5); - - // We push 5 envelopes and check that there is nothing on disk. - for envelope in envelopes.clone() { - assert!(stack.push(envelope).await.is_ok()); - } - assert_eq!(stack.batches_buffer_size, 5); - assert_eq!(envelope_store.total_count().await.unwrap(), 0); - - // We drain the stack and make sure nothing was spooled to disk. - let drained_envelopes = stack.flush(); - assert_eq!(drained_envelopes.into_iter().collect::>().len(), 5); - assert_eq!(envelope_store.total_count().await.unwrap(), 0); - } -} diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index d732452763..ca9493a6e3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -29,21 +29,21 @@ use crate::utils::ManagedEnvelope; use crate::MemoryChecker; use crate::MemoryStat; -pub use envelope_buffer::EnvelopeBufferError; // pub for benchmarks -pub use envelope_buffer::PolymorphicEnvelopeBuffer; +pub use common::{EnvelopeBufferError, ProjectKeyPair}; // pub for benchmarks -pub use envelope_stack::sqlite::SqliteEnvelopeStack; +pub use envelope_buffer::EnvelopeBuffer as EnvelopeBufferImpl; // pub for benchmarks -pub use envelope_stack::EnvelopeStack; +pub use envelope_repository::sqlite::SqliteEnvelopeRepository; +// pub for benchmarks +pub use envelope_repository::EnvelopeRepository; // pub for benchmarks pub use envelope_store::sqlite::SqliteEnvelopeStore; mod common; mod envelope_buffer; -mod envelope_stack; +mod envelope_repository; mod envelope_store; -mod stack_provider; mod testutils; /// Message interface for [`EnvelopeBufferService`]. @@ -155,7 +155,7 @@ impl EnvelopeBufferService { /// Wait for the configured amount of time and make sure the project cache is ready to receive. async fn ready_to_pop( &mut self, - buffer: &PolymorphicEnvelopeBuffer, + buffer: &EnvelopeBufferImpl, dequeue: bool, ) -> Option> { relay_statsd::metric!( @@ -194,7 +194,7 @@ impl EnvelopeBufferService { /// - We should not pop from disk into memory when relay's overall memory capacity /// has been reached. /// - We need a valid global config to unspool. - async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) { + async fn system_ready(&self, buffer: &EnvelopeBufferImpl, dequeue: bool) { loop { // We should not unspool from external storage if memory capacity has been reached. // But if buffer storage is in memory, unspooling can reduce memory usage. @@ -216,7 +216,7 @@ impl EnvelopeBufferService { /// Tries to pop an envelope for a ready project. async fn try_pop<'a>( config: &Config, - buffer: &mut PolymorphicEnvelopeBuffer, + buffer: &mut EnvelopeBufferImpl, services: &Services, envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, ) -> Result { @@ -304,7 +304,7 @@ impl EnvelopeBufferService { managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { + async fn handle_message(buffer: &mut EnvelopeBufferImpl, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -333,12 +333,12 @@ impl EnvelopeBufferService { }; } - async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool { + async fn handle_shutdown(buffer: &mut EnvelopeBufferImpl, message: Shutdown) -> bool { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); - let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await; + let shutdown_result = timeout(shutdown_timeout, buffer.flush()).await; match shutdown_result { Ok(shutdown_result) => { return shutdown_result; @@ -355,7 +355,7 @@ impl EnvelopeBufferService { false } - async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(buffer: &mut EnvelopeBufferImpl, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -364,7 +364,7 @@ impl EnvelopeBufferService { } } - fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { + fn update_observable_state(&self, buffer: &mut EnvelopeBufferImpl) { self.has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); } @@ -383,7 +383,7 @@ impl Service for EnvelopeBufferService { let dequeue1 = dequeue.clone(); tokio::spawn(async move { - let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; + let buffer = EnvelopeBufferImpl::from_config(&config, memory_checker).await; let mut buffer = match buffer { Ok(buffer) => buffer, diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs deleted file mode 100644 index 230db32b34..0000000000 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::services::buffer::common::ProjectKeyPair; -use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::stack_provider::{ - InitializationState, StackCreationType, StackProvider, -}; -use crate::utils::MemoryChecker; -use crate::EnvelopeStack; - -#[derive(Debug)] -pub struct MemoryStackProvider { - memory_checker: MemoryChecker, -} - -impl MemoryStackProvider { - /// Creates a new [`MemoryStackProvider`] with a given [`MemoryChecker`] that is used to - /// estimate the capacity. - pub fn new(memory_checker: MemoryChecker) -> Self { - Self { memory_checker } - } -} - -impl StackProvider for MemoryStackProvider { - type Stack = MemoryEnvelopeStack; - - async fn initialize(&self) -> InitializationState { - InitializationState::empty() - } - - fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack { - MemoryEnvelopeStack::new() - } - - fn has_store_capacity(&self) -> bool { - self.memory_checker.check_memory().has_capacity() - } - - async fn store_total_count(&self) -> u64 { - // The memory implementation doesn't have a store, so the count is 0. - 0 - } - - fn stack_type<'a>(&self) -> &'a str { - "memory" - } - - async fn flush(&mut self, envelope_stacks: impl IntoIterator) { - for envelope_stack in envelope_stacks { - // The flushed envelopes will be immediately dropped. - let _ = envelope_stack.flush(); - } - } -} diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs deleted file mode 100644 index 715d70c436..0000000000 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::services::buffer::common::ProjectKeyPair; -use crate::EnvelopeStack; -use hashbrown::HashSet; -use std::future::Future; - -pub mod memory; -pub mod sqlite; - -/// State of the initialization of the [`StackProvider`]. -/// -/// This state is necessary for initializing resources whenever a [`StackProvider`] is used. -#[derive(Debug)] -pub struct InitializationState { - pub project_key_pairs: HashSet, -} - -impl InitializationState { - /// Create a new [`InitializationState`]. - pub fn new(project_key_pairs: HashSet) -> Self { - Self { project_key_pairs } - } - - /// Creates a new empty [`InitializationState`]. - pub fn empty() -> Self { - Self { - project_key_pairs: HashSet::new(), - } - } -} - -/// The creation type for the [`EnvelopeStack`]. -pub enum StackCreationType { - /// An [`EnvelopeStack`] that is created during initialization. - Initialization, - /// An [`EnvelopeStack`] that is created when an envelope is received. - New, -} - -/// A provider of [`EnvelopeStack`] instances that is responsible for creating them. -pub trait StackProvider: std::fmt::Debug { - /// The implementation of [`EnvelopeStack`] that this manager creates. - type Stack: EnvelopeStack; - - /// Initializes the [`StackProvider`]. - fn initialize(&self) -> impl Future; - - /// Creates an [`EnvelopeStack`]. - fn create_stack( - &self, - stack_creation_type: StackCreationType, - project_key_pair: ProjectKeyPair, - ) -> Self::Stack; - - /// Returns `true` if the store used by this [`StackProvider`] has space to add new - /// stacks or items to the stacks. - fn has_store_capacity(&self) -> bool; - - /// Returns the total count of the store used by this [`StackProvider`]. - fn store_total_count(&self) -> impl Future; - - /// Returns the string representation of the stack type offered by this [`StackProvider`]. - fn stack_type<'a>(&self) -> &'a str; - - /// Flushes the supplied [`EnvelopeStack`]s and consumes the [`StackProvider`]. - fn flush( - &mut self, - envelope_stacks: impl IntoIterator, - ) -> impl Future; -} diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs deleted file mode 100644 index 15e4be6c4e..0000000000 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ /dev/null @@ -1,206 +0,0 @@ -use std::error::Error; - -use relay_config::Config; - -use crate::services::buffer::common::ProjectKeyPair; -use crate::services::buffer::envelope_store::sqlite::{ - SqliteEnvelopeStore, SqliteEnvelopeStoreError, -}; -use crate::services::buffer::stack_provider::{ - InitializationState, StackCreationType, StackProvider, -}; -use crate::statsd::RelayTimers; -use crate::{Envelope, EnvelopeStack, SqliteEnvelopeStack}; - -#[derive(Debug)] -pub struct SqliteStackProvider { - envelope_store: SqliteEnvelopeStore, - disk_batch_size: usize, - max_batches: usize, - max_disk_size: usize, - drain_batch_size: usize, -} - -#[warn(dead_code)] -impl SqliteStackProvider { - /// Creates a new [`SqliteStackProvider`] from the provided [`Config`]. - pub async fn new(config: &Config) -> Result { - let envelope_store = SqliteEnvelopeStore::prepare(config).await?; - Ok(Self { - envelope_store, - disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), - max_batches: config.spool_envelopes_stack_max_batches(), - max_disk_size: config.spool_envelopes_max_disk_size(), - drain_batch_size: config.spool_envelopes_stack_disk_batch_size(), - }) - } - - /// Inserts the supplied [`Envelope`]s in the database. - #[allow(clippy::vec_box)] - async fn drain_many(&mut self, envelopes: Vec>) { - if let Err(error) = self - .envelope_store - .insert_many( - envelopes - .into_iter() - .filter_map(|e| e.as_ref().try_into().ok()), - ) - .await - { - relay_log::error!( - error = &error as &dyn Error, - "failed to drain the envelope stacks, some envelopes might be lost", - ); - } - } - - /// Returns `true` when there might be data residing on disk, `false` otherwise. - fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool { - matches!(stack_creation_type, StackCreationType::Initialization) - } -} - -impl StackProvider for SqliteStackProvider { - type Stack = SqliteEnvelopeStack; - - async fn initialize(&self) -> InitializationState { - match self.envelope_store.project_key_pairs().await { - Ok(project_key_pairs) => InitializationState::new(project_key_pairs), - Err(error) => { - relay_log::error!( - error = &error as &dyn Error, - "failed to initialize the sqlite stack provider" - ); - InitializationState::empty() - } - } - } - - fn create_stack( - &self, - stack_creation_type: StackCreationType, - project_key_pair: ProjectKeyPair, - ) -> Self::Stack { - SqliteEnvelopeStack::new( - self.envelope_store.clone(), - self.disk_batch_size, - self.max_batches, - project_key_pair.own_key, - project_key_pair.sampling_key, - // We want to check the disk by default if we are creating the stack for the first time, - // since we might have some data on disk. - // On the other hand, if we are recreating a stack, it means that we popped it because - // it was empty, or we never had data on disk for that stack, so we assume by default - // that there is no need to check disk until some data is spooled. - Self::assume_data_on_disk(stack_creation_type), - ) - } - - fn has_store_capacity(&self) -> bool { - (self.envelope_store.usage() as usize) < self.max_disk_size - } - - async fn store_total_count(&self) -> u64 { - self.envelope_store - .total_count() - .await - .unwrap_or_else(|error| { - relay_log::error!( - error = &error as &dyn Error, - "failed to get the total count of envelopes for the sqlite envelope store", - ); - // In case we have an error, we default to communicating a total count of 0. - 0 - }) - } - - fn stack_type<'a>(&self) -> &'a str { - "sqlite" - } - - async fn flush(&mut self, envelope_stacks: impl IntoIterator) { - relay_log::trace!("Flushing sqlite envelope buffer"); - - relay_statsd::metric!(timer(RelayTimers::BufferDrain), { - let mut envelopes = Vec::with_capacity(self.drain_batch_size); - for envelope_stack in envelope_stacks { - for envelope in envelope_stack.flush() { - if envelopes.len() >= self.drain_batch_size { - self.drain_many(envelopes).await; - envelopes = Vec::with_capacity(self.drain_batch_size); - } - - envelopes.push(envelope); - } - } - - if !envelopes.is_empty() { - self.drain_many(envelopes).await; - } - }); - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use relay_base_schema::project::ProjectKey; - use relay_config::Config; - use uuid::Uuid; - - use crate::services::buffer::common::ProjectKeyPair; - use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; - use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; - use crate::services::buffer::testutils::utils::mock_envelopes; - use crate::EnvelopeStack; - - fn mock_config() -> Arc { - let path = std::env::temp_dir() - .join(Uuid::new_v4().to_string()) - .into_os_string() - .into_string() - .unwrap(); - - Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": path, - "disk_batch_size": 100, - "max_batches": 1, - } - } - })) - .unwrap() - .into() - } - - #[tokio::test] - async fn test_flush() { - let config = mock_config(); - let mut stack_provider = SqliteStackProvider::new(&config).await.unwrap(); - - let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); - - let mut envelope_stack = stack_provider.create_stack( - StackCreationType::New, - ProjectKeyPair::new(own_key, sampling_key), - ); - - let envelopes = mock_envelopes(10); - for envelope in envelopes { - envelope_stack.push(envelope).await.unwrap(); - } - - let envelope_store = stack_provider.envelope_store.clone(); - - // We make sure that no data is on disk since we will spool when more than 100 elements are - // in the in-memory stack. - assert_eq!(envelope_store.total_count().await.unwrap(), 0); - - // We drain the stack provider, and we expect all in-memory envelopes to be spooled to disk. - stack_provider.flush(vec![envelope_stack]).await; - assert_eq!(envelope_store.total_count().await.unwrap(), 10); - } -} diff --git a/relay-server/src/services/buffer/testutils.rs b/relay-server/src/services/buffer/testutils.rs index 277bbe58f5..41fb750b93 100644 --- a/relay-server/src/services/buffer/testutils.rs +++ b/relay-server/src/services/buffer/testutils.rs @@ -55,13 +55,15 @@ pub mod utils { RequestMeta::new(dsn) } - pub fn mock_envelope(instant: Instant) -> Box { + pub fn mock_envelope(instant: Instant, sampling_key: Option) -> Box { let event_id = EventId::new(); let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + let public_key = + sampling_key.unwrap_or(ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap()); let dsc = DynamicSamplingContext { trace_id: Uuid::new_v4(), - public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + public_key, release: Some("1.1.1".to_string()), user: Default::default(), replay_id: None, @@ -84,7 +86,23 @@ pub mod utils { pub fn mock_envelopes(count: usize) -> Vec> { let instant = Instant::now(); (0..count) - .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) + .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64), None)) + .collect() + } + + #[allow(clippy::vec_box)] + pub fn mock_envelopes_for_project( + count: usize, + sampling_key: ProjectKey, + ) -> Vec> { + let instant = Instant::now(); + (0..count) + .map(|i| { + mock_envelope( + instant - Duration::from_secs((count - i) as u64), + Some(sampling_key), + ) + }) .collect() } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index fbae91d779..fb7489261e 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -181,6 +181,8 @@ pub enum RelayHistograms { /// Number of envelopes in the backpressure buffer between the envelope buffer /// and the project cache. BufferBackpressureEnvelopesCount, + /// Number of envelopes in the buffer per key pair of projects. + BufferInMemoryEnvelopesPerKeyPair, /// The number of batches emitted per partition. BatchesPerPartition, /// The number of buckets in a batch emitted. @@ -309,6 +311,9 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BufferBackpressureEnvelopesCount => { "buffer.backpressure_envelopes_count" } + RelayHistograms::BufferInMemoryEnvelopesPerKeyPair => { + "buffer.in_memory_envelopes_per_key_pair" + } RelayHistograms::ProjectStatePending => "project_state.pending", RelayHistograms::ProjectStateAttempts => "project_state.attempts", RelayHistograms::ProjectStateRequestBatchSize => "project_state.request.batch_size", @@ -536,8 +541,8 @@ pub enum RelayTimers { BufferPeek, /// Timing in milliseconds for the time it takes for the buffer to pop. BufferPop, - /// Timing in milliseconds for the time it takes for the buffer to drain its envelopes. - BufferDrain, + /// Timing in milliseconds for the time it takes for the buffer to flush its envelopes. + BufferFlush, } impl TimerMetric for RelayTimers { @@ -585,7 +590,7 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferPush => "buffer.push.duration", RelayTimers::BufferPeek => "buffer.peek.duration", RelayTimers::BufferPop => "buffer.pop.duration", - RelayTimers::BufferDrain => "buffer.drain.duration", + RelayTimers::BufferFlush => "buffer.flush.duration", } } } @@ -636,8 +641,7 @@ pub enum RelayCounters { /// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded /// in the cache and sends the envelope onward, even though the project cache cannot handle it. BufferEnvelopesReturned, - /// Number of times an envelope stack is popped from the priority queue of stacks in the - /// envelope buffer. + /// Number of times a project key pair is popped from the envelope provider. BufferEnvelopeStacksPopped, /// Number of times an envelope from the buffer is trying to be popped. BufferTryPop, diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index 69063a371f..5d5a7f87c2 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -188,7 +188,7 @@ def test_readiness_disk_spool(mini_sentry, relay): # Second sent event can trigger error on the relay size, since the spool is full now. for _ in range(20): - # It takes ~10 events to make SQLlite use more pages. + # It takes ~10 events to make SQLite use more pages. try: relay.send_event(project_key) except HTTPError as e: