Skip to content

Commit

Permalink
fix integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AnthonyBuisset committed Sep 28, 2023
1 parent 36263e3 commit d1013d8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 15 deletions.
11 changes: 7 additions & 4 deletions api/tests/context/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ pub struct Context {
impl Context {
pub fn new() -> Self {
Self {
_guards: vec![set_env(
OsString::from("HASURA_GRAPHQL_JWT_SECRET"),
r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#,
)],
_guards: vec![
set_env(
OsString::from("HASURA_GRAPHQL_JWT_SECRET"),
r#"{"type":"HS256","key":"secret","issuer":"hasura-auth-unit-tests"}"#,
),
set_env(OsString::from("QUOTES_SYNCER_SLEEP_DURATION"), "0"),
],
}
}
}
24 changes: 15 additions & 9 deletions api/tests/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{env, sync::Arc};
use anyhow::Result;
use api::{
domain::projectors::{self, projections},
presentation::bootstrap,
Config,
};
use domain::{CompositePublisher, Event, EventPublisher, Publisher};
Expand All @@ -14,6 +13,7 @@ use rstest::fixture;
use testcontainers::clients::Cli;
use testing::context::{amqp, coinmarketcap, database, github};
use tokio::task::JoinHandle;
use tokio_cron_scheduler::Job;

pub mod environment;
pub mod indexer;
Expand All @@ -39,16 +39,18 @@ pub struct Context<'a> {
pub indexer: indexer::Context<'a>,
pub web3: web3::Context<'a>,
pub coinmarketcap: coinmarketcap::Context<'a>,
pub quotes_syncer: api::presentation::cron::quotes_syncer::Cron,
pub quotes_syncer: Job,
pub event_publisher: Arc<dyn Publisher<Event>>,
_event_listeners: Vec<JoinHandle<()>>,
_event_listeners: JoinHandle<Result<()>>,
_environment: environment::Context,
}

impl<'a> Context<'a> {
pub async fn new(docker: &'a Cli) -> Result<Context<'a>> {
tracing_subscriber::fmt::init();

let environment = environment::Context::new();

let database = database::Context::new(docker)?;
let amqp = amqp::Context::new(docker, vec![], vec![EXCHANGE_NAME]).await?;
let simple_storage = simple_storage::Context::new(docker)?;
Expand Down Expand Up @@ -111,6 +113,10 @@ impl<'a> Context<'a> {
coinmarketcap: coinmarketcap.config.clone(),
};

let event_listeners = tokio::spawn(api::presentation::event_listeners::_bootstrap(
config.clone(),
));

let event_publisher = CompositePublisher::new(vec![
Arc::new(EventPublisher::new(
projectors::event_store::Projector::new(database.client.clone()),
Expand All @@ -133,10 +139,9 @@ impl<'a> Context<'a> {
))),
]);

let (http_server, event_listeners, cron) = bootstrap(config.clone()).await?;

Ok(Self {
http_client: Client::tracked(http_server).await?,
http_client: Client::tracked(api::presentation::http::bootstrap(config.clone()).await?)
.await?,
database,
amqp,
simple_storage,
Expand All @@ -145,16 +150,17 @@ impl<'a> Context<'a> {
indexer,
web3,
coinmarketcap,
quotes_syncer: cron,
quotes_syncer: api::presentation::cron::quotes_syncer::bootstrap(config.clone())
.await?,
event_publisher: Arc::new(event_publisher),
_event_listeners: event_listeners,
_environment: environment::Context::new(),
_environment: environment,
})
}
}

impl<'a> Drop for Context<'a> {
fn drop(&mut self) {
self._event_listeners.iter().for_each(JoinHandle::abort);
self._event_listeners.abort();
}
}
8 changes: 6 additions & 2 deletions api/tests/crypto_quotes_sync_it.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod context;
mod fixtures;
mod models;

use anyhow::Result;
Expand All @@ -12,7 +13,10 @@ use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use testcontainers::clients::Cli;

use crate::context::{docker, Context};
use crate::{
context::{docker, Context},
fixtures::job::Runnable,
};

#[macro_use]
extern crate diesel;
Expand Down Expand Up @@ -58,7 +62,7 @@ impl<'a> Test<'a> {

// When
let before = Utc::now().naive_utc();
self.context.quotes_syncer.run_once().await;
self.context.quotes_syncer.run().await?;
let after = Utc::now().naive_utc();

// Then
Expand Down
35 changes: 35 additions & 0 deletions api/tests/fixtures/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use anyhow::Result;
use async_trait::async_trait;
use rocket::http::hyper::body::HttpBody;
use tokio::sync::mpsc;
use tokio_cron_scheduler::{Job, JobScheduler, OnJobNotification};

#[async_trait::async_trait]
pub trait Runnable {
async fn run(&self) -> Result<()>;
}

#[async_trait]
impl Runnable for Job {
async fn run(&self) -> Result<()> {
let mut job = self.clone();
let mut scheduler = JobScheduler::new().await?;
let (tx, mut rx) = mpsc::unbounded_channel();

let on_job_done: Box<OnJobNotification> = Box::new(move |_, _, _| {
let cloned_tx = tx.clone();
Box::pin(async move {
cloned_tx.clone().send(()).unwrap();
})
});

job.on_done_notification_add(&scheduler, on_job_done).await?;

scheduler.add(job).await?;
scheduler.start().await?;
rx.recv().await; // Wait for job completion
scheduler.shutdown().await?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions api/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(unused)]

pub mod job;

use std::time::Duration;

pub async fn retry<R, E, F: FnMut() -> std::result::Result<Vec<R>, E>, C: FnMut(&[R]) -> bool>(
Expand Down

0 comments on commit d1013d8

Please sign in to comment.