Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not refresh stuff that is not event-sourced anymore #43

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/domain/projectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod event_store;
pub mod new_indexer;
pub mod projections;
pub mod projections_for_refresh;
189 changes: 189 additions & 0 deletions api/src/domain/projectors/projections_for_refresh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::{convert::TryFrom, sync::Arc};

use anyhow::Result;
use async_trait::async_trait;
use derive_new::new;
use domain::{
BudgetEvent, Event, EventListener, PaymentEvent, PaymentWorkItem, ProjectEvent,
SubscriberCallbackError,
};
use infrastructure::dbclient::{ImmutableRepository, Repository};
use olog::IntoField;
use rust_decimal::Decimal;
use tracing::instrument;

use crate::models::*;

#[allow(clippy::too_many_arguments)]
#[derive(new, Clone)]

Check warning on line 18 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L18

Added line #L18 was not covered by tests
pub struct Projector {
project_budgets_repository: Arc<dyn ImmutableRepository<ProjectsBudget>>,
budget_repository: Arc<dyn Repository<Budget>>,
payment_request_repository: Arc<dyn Repository<PaymentRequest>>,
payment_repository: Arc<dyn Repository<Payment>>,
work_item_repository: Arc<dyn WorkItemRepository>,
projects_rewarded_users_repository: Arc<dyn ProjectsRewardedUserRepository>,
// TODO: replace the repositories below by API call to indexer in another projector
github_repo_index_repository: Arc<dyn GithubRepoIndexRepository>,
github_user_index_repository: Arc<dyn GithubUserIndexRepository>,
}

#[async_trait]
impl EventListener<Event> for Projector {
#[instrument(name = "project_projection", skip(self))]
async fn on_event(&self, event: Event) -> Result<(), SubscriberCallbackError> {
match event {
Event::Budget(event) => match event {
BudgetEvent::Created { id, currency } => {
self.budget_repository.upsert(Budget {
id,
initial_amount: Decimal::ZERO,
remaining_amount: Decimal::ZERO,
currency: currency.try_into()?,
})?;

Check warning on line 43 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L33-L43

Added lines #L33 - L43 were not covered by tests
},
BudgetEvent::Allocated { id, amount, .. } => {
let mut budget = self.budget_repository.find_by_id(id)?;
budget.remaining_amount += amount;
budget.initial_amount += amount;
self.budget_repository.update(budget)?;

Check warning on line 49 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L45-L49

Added lines #L45 - L49 were not covered by tests
},
BudgetEvent::Spent { id, amount } => {
let mut budget = self.budget_repository.find_by_id(id)?;
budget.remaining_amount -= amount;
self.budget_repository.update(budget)?;

Check warning on line 54 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L51-L54

Added lines #L51 - L54 were not covered by tests
},
},
Event::Payment(event) => match event {

Check warning on line 57 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L57

Added line #L57 was not covered by tests
PaymentEvent::Requested {
id: payment_id,
project_id,
requestor_id,
recipient_id,
amount,
reason,
duration_worked,
requested_at,
} => {
self.payment_request_repository
.upsert(PaymentRequest {
id: payment_id,
project_id,
requestor_id,
recipient_id,
amount: *amount.amount(),
currency: amount.currency().try_into()?,
requested_at,
invoice_received_at: None,
hours_worked: duration_worked
.and_then(|duration_worked| {
i32::try_from(duration_worked.num_hours()).ok()
})
.unwrap_or(0),
})
.map_err(|e| {
olog::error!(error = e.to_field(), "payment_request_repository.upsert");
e
})?;

Check warning on line 87 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L59-L87

Added lines #L59 - L87 were not covered by tests

reason.work_items.into_iter().try_for_each(
|work_item| -> Result<(), SubscriberCallbackError> {
let repo_id = match work_item {
PaymentWorkItem::Issue { repo_id, .. }
| PaymentWorkItem::CodeReview { repo_id, .. }
| PaymentWorkItem::PullRequest { repo_id, .. } => repo_id,
};

self.work_item_repository
.try_insert(
(project_id, payment_id, recipient_id, work_item).into(),
)
.map_err(|e| {
olog::error!(
error = e.to_field(),
"error work_item_repository.try_insert"
);
e
})?;

Check warning on line 107 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L89-L107

Added lines #L89 - L107 were not covered by tests

self.github_repo_index_repository.start_indexing(repo_id).map_err(
|e| {
olog::error!(
error = e.to_field(),
"github_repo_index_repository.start_indexing"
);
e
},
)?;
Ok(())
},
)?;

Check warning on line 120 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L109-L120

Added lines #L109 - L120 were not covered by tests

self.github_user_index_repository
.try_insert(GithubUserIndex {
user_id: recipient_id,
..Default::default()
})
.map_err(|e| {
olog::error!(
error = e.to_field(),
"github_user_index_repository.try_insert"
);
e
})?;

Check warning on line 133 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L122-L133

Added lines #L122 - L133 were not covered by tests

self.projects_rewarded_users_repository
.increase_user_reward_count_for_project(&project_id, &recipient_id)
.map_err(|e| {
olog::error!(
error = e.to_field(),
"increase_user_reward_count_for_project"
);
e
})?;

Check warning on line 143 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L135-L143

Added lines #L135 - L143 were not covered by tests
},
PaymentEvent::Cancelled { id: payment_id } => {
let payment_request = self.payment_request_repository.find_by_id(payment_id)?;
self.payment_request_repository.delete(payment_id)?;
self.work_item_repository.delete_by_payment_id(payment_id)?;

Check warning on line 148 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L145-L148

Added lines #L145 - L148 were not covered by tests

self.projects_rewarded_users_repository
.decrease_user_reward_count_for_project(
&payment_request.project_id,
&payment_request.recipient_id,
)?;

Check warning on line 154 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L150-L154

Added lines #L150 - L154 were not covered by tests
},
PaymentEvent::Processed {
id: payment_id,
receipt_id,
amount,
receipt,
processed_at,
} => {
self.payment_repository.upsert(Payment {
id: receipt_id,
amount: *amount.amount(),
currency_code: amount.currency().to_string(),
receipt: serde_json::to_value(receipt)
.map_err(|e| SubscriberCallbackError::Discard(e.into()))?,
request_id: payment_id,
processed_at,
})?;

Check warning on line 171 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L157-L171

Added lines #L157 - L171 were not covered by tests
},
_ => (),

Check warning on line 173 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L173

Added line #L173 was not covered by tests
},
Event::Project(event) => match event {
ProjectEvent::BudgetLinked { id, budget_id, .. } => {
self.project_budgets_repository.try_insert(ProjectsBudget {
project_id: id,
budget_id,
})?;

Check warning on line 180 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L175-L180

Added lines #L175 - L180 were not covered by tests
},
_ => (),

Check warning on line 182 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L182

Added line #L182 was not covered by tests
},
_ => (),

Check warning on line 184 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L184

Added line #L184 was not covered by tests
}

Ok(())
}

Check warning on line 188 in api/src/domain/projectors/projections_for_refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/domain/projectors/projections_for_refresh.rs#L187-L188

Added lines #L187 - L188 were not covered by tests
}
13 changes: 12 additions & 1 deletion api/src/presentation/http/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use rocket::{Build, Rocket};

use crate::{
domain::projectors::{self, new_indexer, projections},
domain::projectors::{self, new_indexer, projections, projections_for_refresh},
infrastructure::{simple_storage, web3::ens},
presentation::{graphql, http, http::github_client_pat_factory::GithubClientPatFactory},
Config,
Expand Down Expand Up @@ -46,6 +46,16 @@
database.clone(),
database.clone(),
);
let projector_for_refresh = projections_for_refresh::Projector::new(
database.clone(),
database.clone(),
database.clone(),
database.clone(),
database.clone(),
database.clone(),
database.clone(),
database.clone(),
);

Check warning on line 58 in api/src/presentation/http/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/bootstrap.rs#L49-L58

Added lines #L49 - L58 were not covered by tests

let event_publisher = CompositePublisher::new(vec![
Arc::new(EventPublisher::new(
Expand Down Expand Up @@ -91,6 +101,7 @@
database.clone(),
database,
projector,
projector_for_refresh,

Check warning on line 104 in api/src/presentation/http/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/bootstrap.rs#L104

Added line #L104 was not covered by tests
);
Ok(rocket_build)
}
6 changes: 4 additions & 2 deletions api/src/presentation/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use crate::{
application,
domain::{projectors::projections::Projector, DustyBotService, ImageStoreService},
domain::{DustyBotService, ImageStoreService},
infrastructure::web3::ens,
models::*,
presentation::{graphql, http::github_client_pat_factory::GithubClientPatFactory},
Expand Down Expand Up @@ -58,7 +58,8 @@
application_event_store: Arc<dyn EventStore<Application>>,
budget_event_store: Arc<dyn EventStore<Budget>>,
payment_event_store: Arc<dyn EventStore<Payment>>,
projector: Projector,
projector: crate::domain::projectors::projections::Projector,
projector_for_refresh: crate::domain::projectors::projections_for_refresh::Projector,

Check warning on line 62 in api/src/presentation/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/mod.rs#L61-L62

Added lines #L61 - L62 were not covered by tests
) -> Rocket<Build> {
let update_user_profile_info_usecase = application::user::update_profile_info::Usecase::new(
user_profile_info_repository.clone(),
Expand Down Expand Up @@ -103,6 +104,7 @@
.manage(budget_event_store)
.manage(payment_event_store)
.manage(projector)
.manage(projector_for_refresh)

Check warning on line 107 in api/src/presentation/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/mod.rs#L107

Added line #L107 was not covered by tests
.attach(http::guards::Cors)
.mount(
"/",
Expand Down
6 changes: 3 additions & 3 deletions api/src/presentation/http/routes/internal/events/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use thiserror::Error;
use uuid::Uuid;

use crate::domain::projectors::projections;
use crate::domain::projectors::projections_for_refresh;

#[derive(Debug, Error)]
enum Error {
Expand Down Expand Up @@ -57,11 +57,11 @@
budget_event_store: &State<Arc<dyn EventStore<Budget>>>,
application_event_store: &State<Arc<dyn EventStore<Application>>>,
payment_event_store: &State<Arc<dyn EventStore<Payment>>>,
projector: &State<projections::Projector>,
projector_for_refresh: &State<projections_for_refresh::Projector>,

Check warning on line 60 in api/src/presentation/http/routes/internal/events/refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/routes/internal/events/refresh.rs#L60

Added line #L60 was not covered by tests
) -> Result<(), HttpApiProblem> {
let mut registry = Registry::new();

let projector = Arc::new((*projector).clone());
let projector = Arc::new((*projector_for_refresh).clone());

Check warning on line 64 in api/src/presentation/http/routes/internal/events/refresh.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/http/routes/internal/events/refresh.rs#L64

Added line #L64 was not covered by tests

Refresher::new((*project_event_store).clone(), projector.clone()).register(&mut registry)?;
Refresher::new((*application_event_store).clone(), projector.clone())
Expand Down
Loading