Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper job management #8

Merged
merged 7 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ version = "1.0.0"
async-trait = "0.1.56"
futures = { version = "0.3.21", features = ["alloc"] }
tokio = { version = "1.18.2", features = ["full"] }
tokio-cron-scheduler = "0.9.4"

# Database
diesel = { version = "2.1.0", default-features = false, features = [
Expand Down
16 changes: 3 additions & 13 deletions api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::{Context, Error, Result};
use api::{presentation::bootstrap, Config};
use api::{presentation::*, Config};
use domain::LogErr;
use dotenv::dotenv;
use futures::future::try_join_all;
use infrastructure::{config, tracing::Tracer};
use olog::{error, info, IntoField};
use tokio::join;

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -16,17 +14,9 @@
let _tracer =
Tracer::init(config.tracer.clone(), "api").context("Tracer initialization")?;

let (http_server, event_listeners, cron) =
bootstrap(config).await.context("App bootstrap")?;
cron::bootstrap(config.clone()).await?.start().await?;

Check warning on line 17 in api/src/main.rs

View check run for this annotation

Codecov / codecov/patch

api/src/main.rs#L17

Added line #L17 was not covered by tests

let (http_server, event_listeners, _) = join!(
http_server.launch(),
try_join_all(event_listeners),
cron.run()
);

let _ = http_server.context("App run")?;
event_listeners.context("event-listeners")?;
let _ = http::bootstrap(config.clone()).await?.launch().await?;

Check warning on line 19 in api/src/main.rs

View check run for this annotation

Codecov / codecov/patch

api/src/main.rs#L19

Added line #L19 was not covered by tests

info!("👋 Gracefully shut down");

Expand Down
11 changes: 8 additions & 3 deletions api/src/presentation/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use anyhow::Result;
use tokio_cron_scheduler::JobScheduler;

use crate::{presentation::event_listeners, Config};

use crate::Config;
pub mod quotes_syncer;

pub fn bootstrap(config: Config) -> Result<quotes_syncer::Cron> {
quotes_syncer::bootstrap(config)
pub async fn bootstrap(config: Config) -> Result<JobScheduler> {
let scheduler = JobScheduler::new().await?;
scheduler.add(quotes_syncer::bootstrap(config.clone()).await?).await?;
scheduler.add(event_listeners::bootstrap(config.clone()).await?).await?;
Ok(scheduler)

Check warning on line 12 in api/src/presentation/cron/mod.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/cron/mod.rs#L8-L12

Added lines #L8 - L12 were not covered by tests
}
56 changes: 21 additions & 35 deletions api/src/presentation/cron/quotes_syncer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use domain::currencies;
use infrastructure::{coinmarketcap, database};
use olog::{error, info, IntoField};
use olog::info;
use tokio::time::Instant;
use tokio_cron_scheduler::Job;

use crate::{application::quotes::sync::Usecase, Config};

pub fn bootstrap(config: Config) -> Result<Cron> {
info!("Bootstrapping quotes_syncer");
pub async fn bootstrap(config: Config) -> Result<Job> {
let job = Job::new_repeated_async(sleep_duration(), move |_id, _lock| {
let cloned_config = config.clone();
Box::pin(async move { _bootstrap(cloned_config.clone()).await.unwrap() })
})?;

Ok(job)
}

async fn _bootstrap(config: Config) -> Result<()> {
let database = Arc::new(database::Client::new(database::init_pool(
config.database.clone(),
)?));
Expand All @@ -22,37 +28,17 @@ pub fn bootstrap(config: Config) -> Result<Cron> {
currencies::USD,
));

Ok(Cron {
usecase: Usecase::new(database, coinmarketcap),
})
}
let start = Instant::now();

pub struct Cron {
usecase: Usecase,
}
let count = Usecase::new(database, coinmarketcap).sync_quotes().await?;

impl Cron {
pub async fn run_once(&self) {
let start = Instant::now();
match self.usecase.sync_quotes().await {
Ok(count) => info!(
duration = start.elapsed().as_secs(),
count = count,
"💸 Crypto currencies prices synced"
),
Err(error) => error!(
error = error.to_field(),
"Failed while syncing crypto currencies prices"
),
}
}
info!(
duration = start.elapsed().as_secs(),
count = count,
"💸 Crypto currencies prices synced"
);

pub async fn run(&self) {
loop {
self.run_once().await;
tokio::time::sleep(sleep_duration()).await;
}
}
Ok(())
}

fn sleep_duration() -> Duration {
Expand Down
31 changes: 27 additions & 4 deletions api/src/presentation/event_listeners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,41 @@
pub mod quote_syncer;
pub mod webhook;

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use domain::{currencies, EventListener, LogErr, Message, Subscriber, SubscriberCallbackError};
use futures::future::try_join_all;
use infrastructure::{amqp::UniqueMessage, coinmarketcap, database, event_bus};
use olog::IntoField;
use olog::{error, IntoField};
use tokio::task::JoinHandle;
use tokio_cron_scheduler::Job;
use tokio_retry::{strategy::FixedInterval, Retry};
use url::{ParseError, Url};
use webhook::EventWebHook;

use self::logger::Logger;
use crate::Config;

pub async fn bootstrap(config: Config) -> Result<Vec<JoinHandle<()>>> {
pub async fn bootstrap(config: Config) -> Result<Job> {
Ok(Job::new_one_shot_async(
Duration::ZERO,
move |_id, _lock| {
let cloned_config = config.clone();
Box::pin(async move {
Retry::spawn(FixedInterval::from_millis(5000), || async {
_bootstrap(cloned_config.clone()).await.log_err(|e| {
error!(error = e.to_field(), "Error in event listeners bootstrap")
})
})
.await
.unwrap()
})
},
)?)
}

Check warning on line 37 in api/src/presentation/event_listeners/mod.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/event_listeners/mod.rs#L21-L37

Added lines #L21 - L37 were not covered by tests

pub async fn _bootstrap(config: Config) -> Result<()> {
info!("Bootstrapping event listeners");
let reqwest = reqwest::Client::new();
let database = Arc::new(database::Client::new(database::init_pool(
Expand All @@ -26,7 +47,9 @@
currencies::USD,
));

spawn_all(config, reqwest, database, coinmarketcap).await
let listeners = spawn_all(config.clone(), reqwest, database, coinmarketcap).await?;
try_join_all(listeners).await?;
Ok(())

Check warning on line 52 in api/src/presentation/event_listeners/mod.rs

View check run for this annotation

Codecov / codecov/patch

api/src/presentation/event_listeners/mod.rs#L52

Added line #L52 was not covered by tests
}

pub async fn spawn_all(
Expand Down
2 changes: 1 addition & 1 deletion api/src/presentation/event_listeners/quote_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl EventListener<Event> for Projector {
.quote_service
.fetch_conversion_rate(currency)
.await
.map_err(SubscriberCallbackError::Discard)?;
.map_err(SubscriberCallbackError::Fatal)?;

self.quotes_repository.insert(CryptoUsdQuote {
currency: code,
Expand Down
20 changes: 0 additions & 20 deletions api/src/presentation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,4 @@
use anyhow::Result;
use rocket::{Build, Rocket};
use tokio::task::JoinHandle;

use crate::Config;

pub mod cron;
pub mod event_listeners;
pub mod graphql;
pub mod http;

pub async fn bootstrap(
config: Config,
) -> Result<(
Rocket<Build>,
Vec<JoinHandle<()>>,
cron::quotes_syncer::Cron,
)> {
Ok((
http::bootstrap(config.clone()).await?,
event_listeners::bootstrap(config.clone()).await?,
cron::bootstrap(config)?,
))
}
11 changes: 7 additions & 4 deletions api/tests/context/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ pub struct Context {
impl Context {
pub fn new() -> Self {
Self {
_guards: vec![set_env(
OsString::from("HASURA_GRAPHQL_JWT_SECRET"),
r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#,
)],
_guards: vec![
set_env(
OsString::from("HASURA_GRAPHQL_JWT_SECRET"),
r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#,
),
set_env(OsString::from("QUOTES_SYNCER_SLEEP_DURATION"), "0"),
],
}
}
}
24 changes: 15 additions & 9 deletions api/tests/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{env, sync::Arc};
use anyhow::Result;
use api::{
domain::projectors::{self, projections},
presentation::bootstrap,
Config,
};
use domain::{CompositePublisher, Event, EventPublisher, Publisher};
Expand All @@ -14,6 +13,7 @@ use rstest::fixture;
use testcontainers::clients::Cli;
use testing::context::{amqp, coinmarketcap, database, github};
use tokio::task::JoinHandle;
use tokio_cron_scheduler::Job;

pub mod environment;
pub mod indexer;
Expand All @@ -39,16 +39,18 @@ pub struct Context<'a> {
pub indexer: indexer::Context<'a>,
pub web3: web3::Context<'a>,
pub coinmarketcap: coinmarketcap::Context<'a>,
pub quotes_syncer: api::presentation::cron::quotes_syncer::Cron,
pub quotes_syncer: Job,
pub event_publisher: Arc<dyn Publisher<Event>>,
_event_listeners: Vec<JoinHandle<()>>,
_event_listeners: JoinHandle<Result<()>>,
_environment: environment::Context,
}

impl<'a> Context<'a> {
pub async fn new(docker: &'a Cli) -> Result<Context<'a>> {
tracing_subscriber::fmt::init();

let environment = environment::Context::new();

let database = database::Context::new(docker)?;
let amqp = amqp::Context::new(docker, vec![], vec![EXCHANGE_NAME]).await?;
let simple_storage = simple_storage::Context::new(docker)?;
Expand Down Expand Up @@ -111,6 +113,10 @@ impl<'a> Context<'a> {
coinmarketcap: coinmarketcap.config.clone(),
};

let event_listeners = tokio::spawn(api::presentation::event_listeners::_bootstrap(
config.clone(),
));

let event_publisher = CompositePublisher::new(vec![
Arc::new(EventPublisher::new(
projectors::event_store::Projector::new(database.client.clone()),
Expand All @@ -133,10 +139,9 @@ impl<'a> Context<'a> {
))),
]);

let (http_server, event_listeners, cron) = bootstrap(config.clone()).await?;

Ok(Self {
http_client: Client::tracked(http_server).await?,
http_client: Client::tracked(api::presentation::http::bootstrap(config.clone()).await?)
.await?,
database,
amqp,
simple_storage,
Expand All @@ -145,16 +150,17 @@ impl<'a> Context<'a> {
indexer,
web3,
coinmarketcap,
quotes_syncer: cron,
quotes_syncer: api::presentation::cron::quotes_syncer::bootstrap(config.clone())
.await?,
event_publisher: Arc::new(event_publisher),
_event_listeners: event_listeners,
_environment: environment::Context::new(),
_environment: environment,
})
}
}

impl<'a> Drop for Context<'a> {
fn drop(&mut self) {
self._event_listeners.iter().for_each(JoinHandle::abort);
self._event_listeners.abort();
}
}
Loading