diff --git a/api/src/domain/projectors/mod.rs b/api/src/domain/projectors/mod.rs index 76fb8d347f..3d2055a1c4 100644 --- a/api/src/domain/projectors/mod.rs +++ b/api/src/domain/projectors/mod.rs @@ -1,3 +1,4 @@ pub mod event_store; pub mod new_indexer; pub mod projections; +pub mod projections_for_refresh; diff --git a/api/src/domain/projectors/projections_for_refresh.rs b/api/src/domain/projectors/projections_for_refresh.rs new file mode 100644 index 0000000000..72a57a55e5 --- /dev/null +++ b/api/src/domain/projectors/projections_for_refresh.rs @@ -0,0 +1,189 @@ +use std::{convert::TryFrom, sync::Arc}; + +use anyhow::Result; +use async_trait::async_trait; +use derive_new::new; +use domain::{ + BudgetEvent, Event, EventListener, PaymentEvent, PaymentWorkItem, ProjectEvent, + SubscriberCallbackError, +}; +use infrastructure::dbclient::{ImmutableRepository, Repository}; +use olog::IntoField; +use rust_decimal::Decimal; +use tracing::instrument; + +use crate::models::*; + +#[allow(clippy::too_many_arguments)] +#[derive(new, Clone)] +pub struct Projector { + project_budgets_repository: Arc>, + budget_repository: Arc>, + payment_request_repository: Arc>, + payment_repository: Arc>, + work_item_repository: Arc, + projects_rewarded_users_repository: Arc, + // TODO: replace the repositories below by API call to indexer in another projector + github_repo_index_repository: Arc, + github_user_index_repository: Arc, +} + +#[async_trait] +impl EventListener for Projector { + #[instrument(name = "project_projection", skip(self))] + async fn on_event(&self, event: Event) -> Result<(), SubscriberCallbackError> { + match event { + Event::Budget(event) => match event { + BudgetEvent::Created { id, currency } => { + self.budget_repository.upsert(Budget { + id, + initial_amount: Decimal::ZERO, + remaining_amount: Decimal::ZERO, + currency: currency.try_into()?, + })?; + }, + BudgetEvent::Allocated { id, amount, .. } => { + let mut budget = self.budget_repository.find_by_id(id)?; + budget.remaining_amount += amount; + budget.initial_amount += amount; + self.budget_repository.update(budget)?; + }, + BudgetEvent::Spent { id, amount } => { + let mut budget = self.budget_repository.find_by_id(id)?; + budget.remaining_amount -= amount; + self.budget_repository.update(budget)?; + }, + }, + Event::Payment(event) => match event { + PaymentEvent::Requested { + id: payment_id, + project_id, + requestor_id, + recipient_id, + amount, + reason, + duration_worked, + requested_at, + } => { + self.payment_request_repository + .upsert(PaymentRequest { + id: payment_id, + project_id, + requestor_id, + recipient_id, + amount: *amount.amount(), + currency: amount.currency().try_into()?, + requested_at, + invoice_received_at: None, + hours_worked: duration_worked + .and_then(|duration_worked| { + i32::try_from(duration_worked.num_hours()).ok() + }) + .unwrap_or(0), + }) + .map_err(|e| { + olog::error!(error = e.to_field(), "payment_request_repository.upsert"); + e + })?; + + reason.work_items.into_iter().try_for_each( + |work_item| -> Result<(), SubscriberCallbackError> { + let repo_id = match work_item { + PaymentWorkItem::Issue { repo_id, .. } + | PaymentWorkItem::CodeReview { repo_id, .. } + | PaymentWorkItem::PullRequest { repo_id, .. } => repo_id, + }; + + self.work_item_repository + .try_insert( + (project_id, payment_id, recipient_id, work_item).into(), + ) + .map_err(|e| { + olog::error!( + error = e.to_field(), + "error work_item_repository.try_insert" + ); + e + })?; + + self.github_repo_index_repository.start_indexing(repo_id).map_err( + |e| { + olog::error!( + error = e.to_field(), + "github_repo_index_repository.start_indexing" + ); + e + }, + )?; + Ok(()) + }, + )?; + + self.github_user_index_repository + .try_insert(GithubUserIndex { + user_id: recipient_id, + ..Default::default() + }) + .map_err(|e| { + olog::error!( + error = e.to_field(), + "github_user_index_repository.try_insert" + ); + e + })?; + + self.projects_rewarded_users_repository + .increase_user_reward_count_for_project(&project_id, &recipient_id) + .map_err(|e| { + olog::error!( + error = e.to_field(), + "increase_user_reward_count_for_project" + ); + e + })?; + }, + PaymentEvent::Cancelled { id: payment_id } => { + let payment_request = self.payment_request_repository.find_by_id(payment_id)?; + self.payment_request_repository.delete(payment_id)?; + self.work_item_repository.delete_by_payment_id(payment_id)?; + + self.projects_rewarded_users_repository + .decrease_user_reward_count_for_project( + &payment_request.project_id, + &payment_request.recipient_id, + )?; + }, + PaymentEvent::Processed { + id: payment_id, + receipt_id, + amount, + receipt, + processed_at, + } => { + self.payment_repository.upsert(Payment { + id: receipt_id, + amount: *amount.amount(), + currency_code: amount.currency().to_string(), + receipt: serde_json::to_value(receipt) + .map_err(|e| SubscriberCallbackError::Discard(e.into()))?, + request_id: payment_id, + processed_at, + })?; + }, + _ => (), + }, + Event::Project(event) => match event { + ProjectEvent::BudgetLinked { id, budget_id, .. } => { + self.project_budgets_repository.try_insert(ProjectsBudget { + project_id: id, + budget_id, + })?; + }, + _ => (), + }, + _ => (), + } + + Ok(()) + } +} diff --git a/api/src/presentation/http/bootstrap.rs b/api/src/presentation/http/bootstrap.rs index 7f1d2cc4a4..3e3fefa17a 100644 --- a/api/src/presentation/http/bootstrap.rs +++ b/api/src/presentation/http/bootstrap.rs @@ -9,7 +9,7 @@ use reqwest::header::HeaderMap; use rocket::{Build, Rocket}; use crate::{ - domain::projectors::{self, new_indexer, projections}, + domain::projectors::{self, new_indexer, projections, projections_for_refresh}, infrastructure::{simple_storage, web3::ens}, presentation::{graphql, http, http::github_client_pat_factory::GithubClientPatFactory}, Config, @@ -46,6 +46,16 @@ pub async fn bootstrap(config: Config) -> Result> { database.clone(), database.clone(), ); + let projector_for_refresh = projections_for_refresh::Projector::new( + database.clone(), + database.clone(), + database.clone(), + database.clone(), + database.clone(), + database.clone(), + database.clone(), + database.clone(), + ); let event_publisher = CompositePublisher::new(vec![ Arc::new(EventPublisher::new( @@ -91,6 +101,7 @@ pub async fn bootstrap(config: Config) -> Result> { database.clone(), database, projector, + projector_for_refresh, ); Ok(rocket_build) } diff --git a/api/src/presentation/http/mod.rs b/api/src/presentation/http/mod.rs index 071126009b..7bcf805934 100644 --- a/api/src/presentation/http/mod.rs +++ b/api/src/presentation/http/mod.rs @@ -12,7 +12,7 @@ use rocket::{Build, Rocket}; use crate::{ application, - domain::{projectors::projections::Projector, DustyBotService, ImageStoreService}, + domain::{DustyBotService, ImageStoreService}, infrastructure::web3::ens, models::*, presentation::{graphql, http::github_client_pat_factory::GithubClientPatFactory}, @@ -58,7 +58,8 @@ pub fn serve( application_event_store: Arc>, budget_event_store: Arc>, payment_event_store: Arc>, - projector: Projector, + projector: crate::domain::projectors::projections::Projector, + projector_for_refresh: crate::domain::projectors::projections_for_refresh::Projector, ) -> Rocket { let update_user_profile_info_usecase = application::user::update_profile_info::Usecase::new( user_profile_info_repository.clone(), @@ -103,6 +104,7 @@ pub fn serve( .manage(budget_event_store) .manage(payment_event_store) .manage(projector) + .manage(projector_for_refresh) .attach(http::guards::Cors) .mount( "/", diff --git a/api/src/presentation/http/routes/internal/events/refresh.rs b/api/src/presentation/http/routes/internal/events/refresh.rs index 9acf19e693..679d4c826f 100644 --- a/api/src/presentation/http/routes/internal/events/refresh.rs +++ b/api/src/presentation/http/routes/internal/events/refresh.rs @@ -16,7 +16,7 @@ use rocket::State; use thiserror::Error; use uuid::Uuid; -use crate::domain::projectors::projections; +use crate::domain::projectors::projections_for_refresh; #[derive(Debug, Error)] enum Error { @@ -57,11 +57,11 @@ pub async fn refresh( budget_event_store: &State>>, application_event_store: &State>>, payment_event_store: &State>>, - projector: &State, + projector_for_refresh: &State, ) -> Result<(), HttpApiProblem> { let mut registry = Registry::new(); - let projector = Arc::new((*projector).clone()); + let projector = Arc::new((*projector_for_refresh).clone()); Refresher::new((*project_event_store).clone(), projector.clone()).register(&mut registry)?; Refresher::new((*application_event_store).clone(), projector.clone())