Skip to content

Commit

Permalink
feat(spooler): Implement cross stacks spooling (#4107)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Oct 9, 2024
1 parent 4342001 commit 4b58953
Show file tree
Hide file tree
Showing 17 changed files with 1,210 additions and 1,313 deletions.
78 changes: 48 additions & 30 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{
Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer,
SqliteEnvelopeStack, SqliteEnvelopeStore,
Envelope, EnvelopeBufferImpl, MemoryChecker, MemoryStat, ProjectKeyPair,
SqliteEnvelopeRepository, SqliteEnvelopeStore,
};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
Expand Down Expand Up @@ -70,7 +70,7 @@ fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box<E
envelope
}

fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
fn benchmark_sqlite_envelope_repository(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);
Expand All @@ -81,6 +81,11 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let mut group = c.benchmark_group("sqlite_envelope_stack");
group.measurement_time(Duration::from_secs(60));

let project_key_pair = ProjectKeyPair::new(
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
);

let disk_batch_size = 1000;
for size in [1_000, 10_000, 100_000].iter() {
for envelope_size in &["small", "medium", "big", "huge"] {
Expand All @@ -97,13 +102,17 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
reset_db(db.clone()).await;
});

let stack = SqliteEnvelopeStack::new(
let config: Arc<Config> = Config::from_json_value(serde_json::json!({
"spool": {
"disk_batch_size": disk_batch_size
}
}))
.unwrap()
.into();

let stack = SqliteEnvelopeRepository::new_with_store(
&config,
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

let mut envelopes = Vec::with_capacity(size);
Expand All @@ -116,7 +125,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
|(mut stack, envelopes)| {
runtime.block_on(async {
for envelope in envelopes {
stack.push(envelope).await.unwrap();
stack.push(project_key_pair, envelope).await.unwrap();
}
});
},
Expand All @@ -134,19 +143,24 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
runtime.block_on(async {
reset_db(db.clone()).await;

let mut stack = SqliteEnvelopeStack::new(
let config: Arc<Config> =
Config::from_json_value(serde_json::json!({
"spool": {
"disk_batch_size": disk_batch_size
}
}))
.unwrap()
.into();

let mut stack = SqliteEnvelopeRepository::new_with_store(
&config,
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-fill the stack
for _ in 0..size {
let envelope = mock_envelope(envelope_size);
stack.push(envelope).await.unwrap();
stack.push(project_key_pair, envelope).await.unwrap();
}

stack
Expand All @@ -156,7 +170,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
runtime.block_on(async {
// Benchmark popping
for _ in 0..size {
stack.pop().await.unwrap();
stack.pop(project_key_pair).await.unwrap();
}
});
},
Expand All @@ -175,13 +189,17 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
reset_db(db.clone()).await;
});

let stack = SqliteEnvelopeStack::new(
let config: Arc<Config> = Config::from_json_value(serde_json::json!({
"spool": {
"disk_batch_size": disk_batch_size
}
}))
.unwrap()
.into();

let stack = SqliteEnvelopeRepository::new_with_store(
&config,
envelope_store.clone(),
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-generate envelopes
Expand All @@ -196,12 +214,12 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
for _ in 0..size {
if rand::random::<bool>() {
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack.push(project_key_pair, envelope).await.unwrap();
}
} else if stack.pop().await.is_err() {
} else if stack.pop(project_key_pair).await.is_err() {
// If pop fails (empty stack), push instead
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack.push(project_key_pair, envelope).await.unwrap();
}
}
}
Expand Down Expand Up @@ -262,7 +280,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
EnvelopeBufferImpl::from_config(&config, memory_checker.clone())
.await
.unwrap();
for envelope in envelopes.into_iter() {
Expand Down Expand Up @@ -294,7 +312,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
EnvelopeBufferImpl::from_config(&config, memory_checker.clone())
.await
.unwrap();
let n = envelopes.len();
Expand All @@ -317,6 +335,6 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
group.finish();
}

criterion_group!(sqlite, benchmark_sqlite_envelope_stack);
criterion_group!(sqlite, benchmark_sqlite_envelope_repository);
criterion_group!(buffer, benchmark_envelope_buffer);
criterion_main!(sqlite, buffer);
15 changes: 10 additions & 5 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,17 @@ mod services;
mod statsd;
mod utils;

pub use self::envelope::Envelope; // pub for benchmarks
// pub for benchmarks
pub use self::envelope::Envelope;
// pub for benchmarks
pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
EnvelopeBufferImpl, EnvelopeRepository, ProjectKeyPair, SqliteEnvelopeRepository,
SqliteEnvelopeStore,
};
// pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks
// pub for benchmarks
pub use self::utils::{MemoryChecker, MemoryStat};

#[cfg(test)]
mod testutils;
Expand All @@ -288,7 +293,7 @@ use crate::services::server::HttpServer;
///
/// This effectively boots the entire server application. It blocks the current thread until a
/// shutdown signal is received or a fatal error happens. Behavior of the server is determined by
/// the `config` passed into this funciton.
/// the `config` passed into this function.
pub fn run(config: Config) -> anyhow::Result<()> {
let config = Arc::new(config);
relay_log::info!("relay server starting");
Expand Down
34 changes: 34 additions & 0 deletions relay-server/src/services/buffer/common.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,62 @@
use relay_base_schema::project::ProjectKey;
use std::convert::Infallible;

use crate::services::buffer::envelope_repository::sqlite::SqliteEnvelopeRepositoryError;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::Envelope;

/// Error that occurs while interacting with the envelope buffer.
#[derive(Debug, thiserror::Error)]
pub enum EnvelopeBufferError {
#[error("sqlite")]
SqliteStore(#[from] SqliteEnvelopeStoreError),

#[error("sqlite")]
SqliteRepository(#[from] SqliteEnvelopeRepositoryError),

#[error("failed to push envelope to the buffer")]
PushFailed,
}

impl From<Infallible> for EnvelopeBufferError {
fn from(value: Infallible) -> Self {
match value {}
}
}

/// Struct that represents two project keys.
#[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub struct ProjectKeyPair {
/// [`ProjectKey`] of the project of the envelope.
pub own_key: ProjectKey,
/// [`ProjectKey`] of the root project of the trace to which the envelope belongs.
pub sampling_key: ProjectKey,
}

impl ProjectKeyPair {
/// Creates a new [`ProjectKeyPair`] with the given `own_key` and `sampling_key`.
pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self {
Self {
own_key,
sampling_key,
}
}

/// Creates a [`ProjectKeyPair`] from an [`Envelope`].
///
/// The `own_key` is set to the public key from the envelope's metadata.
/// The `sampling_key` is set to the envelope's sampling key if present,
/// otherwise it defaults to the `own_key`.
pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
}

/// Returns an iterator over the project keys.
///
/// The iterator always yields the `own_key` and yields the `sampling_key`
/// only if it's different from the `own_key`.
pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
let Self {
own_key,
Expand Down
Loading

0 comments on commit 4b58953

Please sign in to comment.