Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 16, 2024
1 parent cd84927 commit 9357383
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 2 deletions.
84 changes: 84 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ num-traits = "0.2.18"
num_cpus = "1.13.0"
once_cell = "1.13.1"
opentelemetry-proto = { version = "0.7.0", default-features = false }
papaya = "0.1.4"
parking_lot = "0.12.1"
path-slash = "0.2.1"
pest = "2.1.3"
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mime = { workspace = true }
minidump = { workspace = true, optional = true }
multer = { workspace = true }
once_cell = { workspace = true }
papaya = { workspace = true }
pin-project-lite = { workspace = true }
priority-queue = { workspace = true }
rand = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ impl FromMessage<UpdateProject> for ProjectCache {
///
/// See [`RequestUpdate`] for a description on how project states are fetched.
#[derive(Clone, Debug)]
struct ProjectSource {
pub struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
Expand Down Expand Up @@ -489,7 +489,7 @@ impl ProjectSource {
}
}

async fn fetch(
pub async fn fetch(
self,
project_key: ProjectKey,
no_cache: bool,
Expand Down
24 changes: 24 additions & 0 deletions relay-server/src/services/projects/cache2/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::Arc;

use relay_base_schema::project::ProjectKey;
use relay_system::Addr;

use super::state::Shared;
use crate::services::projects::cache2::service::ProjectCache;

pub struct ProjectCacheHandle {
shared: Arc<Shared>,
service: Addr<ProjectCache>,
}

impl ProjectCacheHandle {
pub fn get(&self, project_key: ProjectKey) -> Option<()> {
match self.shared.get(project_key) {
Some(project) => Some(project),
None => {
self.service.send(ProjectCache::Fetch(project_key));
None
}
}
}
}
43 changes: 43 additions & 0 deletions relay-server/src/services/projects/cache2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::sync::Arc;

use relay_base_schema::project::ProjectKey;

mod refresh;
mod handle;
mod service;
mod state;

// pub struct ProjectCacheService {
// inner: Inner,
// }
//
// impl ProjectCacheService {
// fn handle(&mut self, message: super::cache::ProjectCache) {
// match message {
// super::cache::ProjectCache::Get(_, _) => todo!(),
// super::cache::ProjectCache::GetCached(_, _) => todo!(),
// super::cache::ProjectCache::CheckEnvelope(_, _) => todo!(),
// super::cache::ProjectCache::ValidateEnvelope(_) => todo!(),
// super::cache::ProjectCache::UpdateRateLimits(_) => todo!(),
// super::cache::ProjectCache::ProcessMetrics(_) => todo!(),
// super::cache::ProjectCache::AddMetricMeta(_) => todo!(),
// super::cache::ProjectCache::FlushBuckets(_) => todo!(),
//
// // Spooling
// super::cache::ProjectCache::UpdateSpoolIndex(_) => todo!(),
// super::cache::ProjectCache::RefreshIndexCache(_) => todo!(),
//
// // Sent by the envelope buffer to force update a project.
// // Eventually the envelope buffer needs a response whether the project is ready/not
// // ready.
// //
// // Like GetCached but with less params and no response.
// super::cache::ProjectCache::UpdateProject(_) => todo!(),
//
//
// // This is the typical fetch.
// super::cache::ProjectCache::RequestUpdate(_) => todo!(),
// }
// }
//
// }
Empty file.
106 changes: 106 additions & 0 deletions relay-server/src/services/projects/cache2/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::sync::Arc;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::sync::mpsc;

use crate::services::projects::cache::ProjectSource;
use crate::services::projects::cache2::state::{CompletedFetch, Fetch};
use crate::services::projects::project::{ProjectFetchState, ProjectState};

pub enum ProjectCache {
Fetch(ProjectKey),
}

impl relay_system::Interface for ProjectCache {}

impl relay_system::FromMessage<Self> for ProjectCache {
type Response = relay_system::NoResponse;

fn from_message(message: Self, _: ()) -> Self {
message
}
}

pub struct ProjectCacheService {
inner: super::state::Inner,
source: ProjectSource,
config: Arc<Config>,

project_update_rx: mpsc::UnboundedReceiver<CompletedFetch>,
project_update_tx: mpsc::UnboundedSender<CompletedFetch>,
}

impl ProjectCacheService {
fn schedule_fetch(&self, fetch: Fetch) {
let source = self.source.clone();
let project_updates = self.project_update_tx.clone();

tokio::spawn(async move {
tokio::time::sleep_until(fetch.when().into()).await;

// TODO: cached state for delta fetches, maybe this should just be a revision?
let state = match source
.fetch(fetch.project_key(), false, ProjectFetchState::pending())
.await
{
// TODO: verify if the sanitized here is correct
Ok(state) => state.sanitized().into(),
Err(err) => {
relay_log::error!(
error = &err as &dyn std::error::Error,
"failed to fetch project state for {fetch:?}"
);
ProjectState::Pending
}
};

project_updates.send(fetch.complete(state));
});
}
}

impl ProjectCacheService {
fn handle_fetch(&mut self, project_key: ProjectKey) {
if let Some(fetch) = self.inner.try_begin_fetch(project_key, &self.config) {
self.schedule_fetch(fetch);
}
}

fn handle_project_update(&mut self, fetch: CompletedFetch) {
if let Some(fetch) = self.inner.complete_fetch(fetch, &self.config) {
relay_log::trace!(
project_key = fetch.project_key().as_str(),
"re-scheduling project fetch: {fetch:?}"
);
self.schedule_fetch(fetch);
}
}

fn handle(&mut self, message: ProjectCache) {
match message {
ProjectCache::Fetch(project_key) => self.handle_fetch(project_key),
}
}
}

impl relay_system::Service for ProjectCacheService {
type Interface = ProjectCache;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
loop {
tokio::select! {
biased;

Some(update) = self.project_update_rx.recv() => {
self.handle_project_update(update)
},
Some(message) = rx.recv() => {
self.handle(message);
}
}
}
});
}
}
Loading

0 comments on commit 9357383

Please sign in to comment.