From f2712b860c769048fc79e98dafbd826a03e1cf62 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 17:22:49 +0200 Subject: [PATCH 1/7] Remove connection retries in amqp --- common/infrastructure/src/amqp/bus/mod.rs | 24 ++--------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/common/infrastructure/src/amqp/bus/mod.rs b/common/infrastructure/src/amqp/bus/mod.rs index cc53b01d17..5c7c9efaf1 100644 --- a/common/infrastructure/src/amqp/bus/mod.rs +++ b/common/infrastructure/src/amqp/bus/mod.rs @@ -7,10 +7,9 @@ use lapin::{ publisher_confirm::Confirmation, BasicProperties, Channel, Connection, Consumer, }; -use olog::{error, IntoField}; +use olog::error; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; -use tokio_retry::{strategy::FixedInterval, Retry}; use tokio_stream::StreamExt; use super::{Config, UniqueMessage}; @@ -195,25 +194,6 @@ async fn connect(config: Config) -> Result, Error> { /// This function actually connects to RabbitMQ and must be called only once async fn _do_connect(config: Config) -> Result { - let retry_strategy = FixedInterval::from_millis(config.connection_retry_interval_ms) - .take(config.connection_retry_count); - - let connection = Retry::spawn(retry_strategy, || async { - Connection::connect(&config.url, Default::default()).await.map_err(|error| { - error!( - error = error.to_field(), - "Failed to connect to RabbitMQ. Retrying in {}ms for a maximum of {} attempts.", - config.connection_retry_interval_ms, - config.connection_retry_count - ); - error - }) - }) - .await?; - connection.on_error(|error| { - error!(error = error.to_field(), "Lost connection to RabbitMQ"); - std::process::exit(1); - }); - + let connection = Connection::connect(&config.url, Default::default()).await?; Ok(connection) } From 0610654fcf8b9106ace674dc37b1d3a790f81b07 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 17:23:31 +0200 Subject: [PATCH 2/7] Add tokio-cron-scheduler --- Cargo.lock | 38 ++++++++++++++++++++++++++++++++++++++ api/Cargo.toml | 1 + 2 files changed, 39 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index fbea899b69..b1ebe35aa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,7 @@ dependencies = [ "testing", "thiserror", "tokio", + "tokio-cron-scheduler", "tokio-retry", "tracing", "tracing-subscriber", @@ -995,6 +996,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff76b51e4c068c52bfd2866e1567bee7c567ae8f24ada09fd4307019e25eab7" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -3025,6 +3037,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -5106,6 +5129,21 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e57fbf1da3f18c8a95469f8973c138b0a99f4ae761885c3646b0c61139b0522" +[[package]] +name = "tokio-cron-scheduler" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de2c1fd54a857b29c6cd1846f31903d0ae8e28175615c14a277aed45c58d8e27" +dependencies = [ + "chrono", + "cron", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid 1.3.3", +] + [[package]] name = "tokio-macros" version = "2.1.0" diff --git a/api/Cargo.toml b/api/Cargo.toml index d5479c0d6d..17a208410f 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -16,6 +16,7 @@ version = "1.0.0" async-trait = "0.1.56" futures = { version = "0.3.21", features = ["alloc"] } tokio = { version = "1.18.2", features = ["full"] } +tokio-cron-scheduler = "0.9.4" # Database diesel = { version = "2.1.0", default-features = false, features = [ From 7a65e353061e9ec934d6c66f48f4995d51fe200d Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 17:24:11 +0200 Subject: [PATCH 3/7] Turn cron and event listeners bootstrap into jobs --- api/src/presentation/cron/quotes_syncer.rs | 56 ++++++++------------- api/src/presentation/event_listeners/mod.rs | 31 ++++++++++-- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/api/src/presentation/cron/quotes_syncer.rs b/api/src/presentation/cron/quotes_syncer.rs index 3f924b02b2..c61f39f88b 100644 --- a/api/src/presentation/cron/quotes_syncer.rs +++ b/api/src/presentation/cron/quotes_syncer.rs @@ -1,18 +1,24 @@ -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; use anyhow::Result; use domain::currencies; use infrastructure::{coinmarketcap, database}; -use olog::{error, info, IntoField}; +use olog::info; +use tokio::time::Instant; +use tokio_cron_scheduler::Job; use crate::{application::quotes::sync::Usecase, Config}; -pub fn bootstrap(config: Config) -> Result { - info!("Bootstrapping quotes_syncer"); +pub async fn bootstrap(config: Config) -> Result { + let job = Job::new_repeated_async(sleep_duration(), move |_id, _lock| { + let cloned_config = config.clone(); + Box::pin(async move { _bootstrap(cloned_config.clone()).await.unwrap() }) + })?; + Ok(job) +} + +async fn _bootstrap(config: Config) -> Result<()> { let database = Arc::new(database::Client::new(database::init_pool( config.database.clone(), )?)); @@ -22,37 +28,17 @@ pub fn bootstrap(config: Config) -> Result { currencies::USD, )); - Ok(Cron { - usecase: Usecase::new(database, coinmarketcap), - }) -} + let start = Instant::now(); -pub struct Cron { - usecase: Usecase, -} + let count = Usecase::new(database, coinmarketcap).sync_quotes().await?; -impl Cron { - pub async fn run_once(&self) { - let start = Instant::now(); - match self.usecase.sync_quotes().await { - Ok(count) => info!( - duration = start.elapsed().as_secs(), - count = count, - "💸 Crypto currencies prices synced" - ), - Err(error) => error!( - error = error.to_field(), - "Failed while syncing crypto currencies prices" - ), - } - } + info!( + duration = start.elapsed().as_secs(), + count = count, + "💸 Crypto currencies prices synced" + ); - pub async fn run(&self) { - loop { - self.run_once().await; - tokio::time::sleep(sleep_duration()).await; - } - } + Ok(()) } fn sleep_duration() -> Duration { diff --git a/api/src/presentation/event_listeners/mod.rs b/api/src/presentation/event_listeners/mod.rs index 96cad2f453..cc272e34da 100644 --- a/api/src/presentation/event_listeners/mod.rs +++ b/api/src/presentation/event_listeners/mod.rs @@ -2,20 +2,41 @@ pub mod logger; pub mod quote_syncer; pub mod webhook; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use anyhow::Result; use domain::{currencies, EventListener, LogErr, Message, Subscriber, SubscriberCallbackError}; +use futures::future::try_join_all; use infrastructure::{amqp::UniqueMessage, coinmarketcap, database, event_bus}; -use olog::IntoField; +use olog::{error, IntoField}; use tokio::task::JoinHandle; +use tokio_cron_scheduler::Job; +use tokio_retry::{strategy::FixedInterval, Retry}; use url::{ParseError, Url}; use webhook::EventWebHook; use self::logger::Logger; use crate::Config; -pub async fn bootstrap(config: Config) -> Result>> { +pub async fn bootstrap(config: Config) -> Result { + Ok(Job::new_one_shot_async( + Duration::ZERO, + move |_id, _lock| { + let cloned_config = config.clone(); + Box::pin(async move { + Retry::spawn(FixedInterval::from_millis(5000), || async { + _bootstrap(cloned_config.clone()).await.log_err(|e| { + error!(error = e.to_field(), "Error in event listeners bootstrap") + }) + }) + .await + .unwrap() + }) + }, + )?) +} + +pub async fn _bootstrap(config: Config) -> Result<()> { info!("Bootstrapping event listeners"); let reqwest = reqwest::Client::new(); let database = Arc::new(database::Client::new(database::init_pool( @@ -26,7 +47,9 @@ pub async fn bootstrap(config: Config) -> Result>> { currencies::USD, )); - spawn_all(config, reqwest, database, coinmarketcap).await + let listeners = spawn_all(config.clone(), reqwest, database, coinmarketcap).await?; + try_join_all(listeners).await?; + Ok(()) } pub async fn spawn_all( From ec802a17ff9304ee3d395bc5ad0736e44da492d5 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 17:25:10 +0200 Subject: [PATCH 4/7] run job scheduler in main --- api/src/main.rs | 16 +++------------- api/src/presentation/cron/mod.rs | 11 ++++++++--- api/src/presentation/mod.rs | 20 -------------------- 3 files changed, 11 insertions(+), 36 deletions(-) diff --git a/api/src/main.rs b/api/src/main.rs index 67aed62b24..5e1e9425bc 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -1,11 +1,9 @@ use anyhow::{Context, Error, Result}; -use api::{presentation::bootstrap, Config}; +use api::{presentation::*, Config}; use domain::LogErr; use dotenv::dotenv; -use futures::future::try_join_all; use infrastructure::{config, tracing::Tracer}; use olog::{error, info, IntoField}; -use tokio::join; #[tokio::main] async fn main() -> Result<()> { @@ -16,17 +14,9 @@ async fn main() -> Result<()> { let _tracer = Tracer::init(config.tracer.clone(), "api").context("Tracer initialization")?; - let (http_server, event_listeners, cron) = - bootstrap(config).await.context("App bootstrap")?; + cron::bootstrap(config.clone()).await?.start().await?; - let (http_server, event_listeners, _) = join!( - http_server.launch(), - try_join_all(event_listeners), - cron.run() - ); - - let _ = http_server.context("App run")?; - event_listeners.context("event-listeners")?; + let _ = http::bootstrap(config.clone()).await?.launch().await?; info!("👋 Gracefully shut down"); diff --git a/api/src/presentation/cron/mod.rs b/api/src/presentation/cron/mod.rs index c8c826dec3..6150a9098f 100644 --- a/api/src/presentation/cron/mod.rs +++ b/api/src/presentation/cron/mod.rs @@ -1,8 +1,13 @@ use anyhow::Result; +use tokio_cron_scheduler::JobScheduler; + +use crate::{presentation::event_listeners, Config}; -use crate::Config; pub mod quotes_syncer; -pub fn bootstrap(config: Config) -> Result { - quotes_syncer::bootstrap(config) +pub async fn bootstrap(config: Config) -> Result { + let scheduler = JobScheduler::new().await?; + scheduler.add(quotes_syncer::bootstrap(config.clone()).await?).await?; + scheduler.add(event_listeners::bootstrap(config.clone()).await?).await?; + Ok(scheduler) } diff --git a/api/src/presentation/mod.rs b/api/src/presentation/mod.rs index 8aaf469c05..75a45c0765 100644 --- a/api/src/presentation/mod.rs +++ b/api/src/presentation/mod.rs @@ -1,24 +1,4 @@ -use anyhow::Result; -use rocket::{Build, Rocket}; -use tokio::task::JoinHandle; - -use crate::Config; - pub mod cron; pub mod event_listeners; pub mod graphql; pub mod http; - -pub async fn bootstrap( - config: Config, -) -> Result<( - Rocket, - Vec>, - cron::quotes_syncer::Cron, -)> { - Ok(( - http::bootstrap(config.clone()).await?, - event_listeners::bootstrap(config.clone()).await?, - cron::bootstrap(config)?, - )) -} From 9c16b68bebc23a37645f55f1bfd8c8fe5965f898 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 18:19:59 +0200 Subject: [PATCH 5/7] Force reconnection to rabbitmq if current connection is errored --- common/infrastructure/src/amqp/bus/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/infrastructure/src/amqp/bus/mod.rs b/common/infrastructure/src/amqp/bus/mod.rs index 5c7c9efaf1..1915ac3954 100644 --- a/common/infrastructure/src/amqp/bus/mod.rs +++ b/common/infrastructure/src/amqp/bus/mod.rs @@ -183,8 +183,8 @@ lazy_static! { async fn connect(config: Config) -> Result, Error> { let mut guard = CONNECTION.lock().await; match guard.as_ref().and_then(Weak::upgrade) { - Some(connection) => Ok(connection), - None => { + Some(connection) if connection.status().connected() => Ok(connection), + _ => { let connection = Arc::new(_do_connect(config).await?); *guard = Some(Arc::downgrade(&connection)); Ok(connection) From 36263e301416db18ed1d8cf61977c15c0b12e758 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 18:26:42 +0200 Subject: [PATCH 6/7] Retry events when crypto quote fetch fail --- api/src/presentation/event_listeners/quote_syncer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/presentation/event_listeners/quote_syncer.rs b/api/src/presentation/event_listeners/quote_syncer.rs index 49a6eff4d4..0388080847 100644 --- a/api/src/presentation/event_listeners/quote_syncer.rs +++ b/api/src/presentation/event_listeners/quote_syncer.rs @@ -31,7 +31,7 @@ impl EventListener for Projector { .quote_service .fetch_conversion_rate(currency) .await - .map_err(SubscriberCallbackError::Discard)?; + .map_err(SubscriberCallbackError::Fatal)?; self.quotes_repository.insert(CryptoUsdQuote { currency: code, From d1013d80b598f010d117c1e9b168725b24244f49 Mon Sep 17 00:00:00 2001 From: Anthony Buisset Date: Wed, 27 Sep 2023 22:31:01 +0200 Subject: [PATCH 7/7] fix integration tests --- api/tests/context/environment.rs | 11 ++++++---- api/tests/context/mod.rs | 24 ++++++++++++-------- api/tests/crypto_quotes_sync_it.rs | 8 +++++-- api/tests/fixtures/job.rs | 35 ++++++++++++++++++++++++++++++ api/tests/fixtures/mod.rs | 2 ++ 5 files changed, 65 insertions(+), 15 deletions(-) create mode 100644 api/tests/fixtures/job.rs diff --git a/api/tests/context/environment.rs b/api/tests/context/environment.rs index da8aa8733d..37f8c1798c 100644 --- a/api/tests/context/environment.rs +++ b/api/tests/context/environment.rs @@ -9,10 +9,13 @@ pub struct Context { impl Context { pub fn new() -> Self { Self { - _guards: vec![set_env( - OsString::from("HASURA_GRAPHQL_JWT_SECRET"), - r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#, - )], + _guards: vec![ + set_env( + OsString::from("HASURA_GRAPHQL_JWT_SECRET"), + r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#, + ), + set_env(OsString::from("QUOTES_SYNCER_SLEEP_DURATION"), "0"), + ], } } } diff --git a/api/tests/context/mod.rs b/api/tests/context/mod.rs index b8300d7fe9..305b105fa3 100644 --- a/api/tests/context/mod.rs +++ b/api/tests/context/mod.rs @@ -3,7 +3,6 @@ use std::{env, sync::Arc}; use anyhow::Result; use api::{ domain::projectors::{self, projections}, - presentation::bootstrap, Config, }; use domain::{CompositePublisher, Event, EventPublisher, Publisher}; @@ -14,6 +13,7 @@ use rstest::fixture; use testcontainers::clients::Cli; use testing::context::{amqp, coinmarketcap, database, github}; use tokio::task::JoinHandle; +use tokio_cron_scheduler::Job; pub mod environment; pub mod indexer; @@ -39,9 +39,9 @@ pub struct Context<'a> { pub indexer: indexer::Context<'a>, pub web3: web3::Context<'a>, pub coinmarketcap: coinmarketcap::Context<'a>, - pub quotes_syncer: api::presentation::cron::quotes_syncer::Cron, + pub quotes_syncer: Job, pub event_publisher: Arc>, - _event_listeners: Vec>, + _event_listeners: JoinHandle>, _environment: environment::Context, } @@ -49,6 +49,8 @@ impl<'a> Context<'a> { pub async fn new(docker: &'a Cli) -> Result> { tracing_subscriber::fmt::init(); + let environment = environment::Context::new(); + let database = database::Context::new(docker)?; let amqp = amqp::Context::new(docker, vec![], vec![EXCHANGE_NAME]).await?; let simple_storage = simple_storage::Context::new(docker)?; @@ -111,6 +113,10 @@ impl<'a> Context<'a> { coinmarketcap: coinmarketcap.config.clone(), }; + let event_listeners = tokio::spawn(api::presentation::event_listeners::_bootstrap( + config.clone(), + )); + let event_publisher = CompositePublisher::new(vec![ Arc::new(EventPublisher::new( projectors::event_store::Projector::new(database.client.clone()), @@ -133,10 +139,9 @@ impl<'a> Context<'a> { ))), ]); - let (http_server, event_listeners, cron) = bootstrap(config.clone()).await?; - Ok(Self { - http_client: Client::tracked(http_server).await?, + http_client: Client::tracked(api::presentation::http::bootstrap(config.clone()).await?) + .await?, database, amqp, simple_storage, @@ -145,16 +150,17 @@ impl<'a> Context<'a> { indexer, web3, coinmarketcap, - quotes_syncer: cron, + quotes_syncer: api::presentation::cron::quotes_syncer::bootstrap(config.clone()) + .await?, event_publisher: Arc::new(event_publisher), _event_listeners: event_listeners, - _environment: environment::Context::new(), + _environment: environment, }) } } impl<'a> Drop for Context<'a> { fn drop(&mut self) { - self._event_listeners.iter().for_each(JoinHandle::abort); + self._event_listeners.abort(); } } diff --git a/api/tests/crypto_quotes_sync_it.rs b/api/tests/crypto_quotes_sync_it.rs index 02d7f7da97..48f07b50af 100644 --- a/api/tests/crypto_quotes_sync_it.rs +++ b/api/tests/crypto_quotes_sync_it.rs @@ -1,4 +1,5 @@ mod context; +mod fixtures; mod models; use anyhow::Result; @@ -12,7 +13,10 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use testcontainers::clients::Cli; -use crate::context::{docker, Context}; +use crate::{ + context::{docker, Context}, + fixtures::job::Runnable, +}; #[macro_use] extern crate diesel; @@ -58,7 +62,7 @@ impl<'a> Test<'a> { // When let before = Utc::now().naive_utc(); - self.context.quotes_syncer.run_once().await; + self.context.quotes_syncer.run().await?; let after = Utc::now().naive_utc(); // Then diff --git a/api/tests/fixtures/job.rs b/api/tests/fixtures/job.rs new file mode 100644 index 0000000000..1f630e10bf --- /dev/null +++ b/api/tests/fixtures/job.rs @@ -0,0 +1,35 @@ +use anyhow::Result; +use async_trait::async_trait; +use rocket::http::hyper::body::HttpBody; +use tokio::sync::mpsc; +use tokio_cron_scheduler::{Job, JobScheduler, OnJobNotification}; + +#[async_trait::async_trait] +pub trait Runnable { + async fn run(&self) -> Result<()>; +} + +#[async_trait] +impl Runnable for Job { + async fn run(&self) -> Result<()> { + let mut job = self.clone(); + let mut scheduler = JobScheduler::new().await?; + let (tx, mut rx) = mpsc::unbounded_channel(); + + let on_job_done: Box = Box::new(move |_, _, _| { + let cloned_tx = tx.clone(); + Box::pin(async move { + cloned_tx.clone().send(()).unwrap(); + }) + }); + + job.on_done_notification_add(&scheduler, on_job_done).await?; + + scheduler.add(job).await?; + scheduler.start().await?; + rx.recv().await; // Wait for job completion + scheduler.shutdown().await?; + + Ok(()) + } +} diff --git a/api/tests/fixtures/mod.rs b/api/tests/fixtures/mod.rs index af84481c56..4c03c9f01a 100644 --- a/api/tests/fixtures/mod.rs +++ b/api/tests/fixtures/mod.rs @@ -1,5 +1,7 @@ #![allow(unused)] +pub mod job; + use std::time::Duration; pub async fn retry std::result::Result, E>, C: FnMut(&[R]) -> bool>(