Skip to content

Commit

Permalink
put task killer back in
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Aug 3, 2023
1 parent 3032cb6 commit b78f22c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
8 changes: 5 additions & 3 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ impl Server {
// *
// setup the price tracker requirements
// *
let (price_tracker, price_sender) = PriceTracker::new(&settings.price_tracker).await?;

let price_daemon = PriceTrackerDaemon::new(&settings.price_tracker, price_sender).await?;
let (price_tracker, price_sender, task_killer_receiver) =
PriceTracker::new(&settings.price_tracker).await?;
let price_daemon =
PriceTrackerDaemon::new(&settings.price_tracker, price_sender, task_killer_receiver)
.await?;

// *
// setup the loader requirements
Expand Down
8 changes: 5 additions & 3 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ impl Cmd {
// *
// setup the price tracker requirements
// *
let (price_tracker, price_sender) = PriceTracker::new(&settings.price_tracker).await?;

let price_daemon = PriceTrackerDaemon::new(&settings.price_tracker, price_sender).await?;
let (price_tracker, price_sender, task_killer_receiver) =
PriceTracker::new(&settings.price_tracker).await?;
let price_daemon =
PriceTrackerDaemon::new(&settings.price_tracker, price_sender, task_killer_receiver)
.await?;

// Heartbeats
let (heartbeats, heartbeats_ingest_server) =
Expand Down
20 changes: 18 additions & 2 deletions price/src/price_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use chrono::{DateTime, Duration, TimeZone, Utc};
use file_store::{FileInfo, FileStore, FileType};
use futures::{
Expand Down Expand Up @@ -69,18 +70,23 @@ impl Settings {
pub struct PriceTracker {
price_duration: Duration,
price_receiver: watch::Receiver<Prices>,
task_killer: mpsc::Sender<String>,
}

impl PriceTracker {
pub async fn new(settings: &Settings) -> anyhow::Result<(Self, watch::Sender<Prices>)> {
pub async fn new(
settings: &Settings,
) -> anyhow::Result<(Self, watch::Sender<Prices>, mpsc::Receiver<String>)> {
let (price_sender, price_receiver) = watch::channel(Prices::new());

let (task_kill_sender, task_kill_receiver) = mpsc::channel(1);
Ok((
Self {
price_duration: settings.price_duration(),
price_receiver,
task_killer: task_kill_sender,
},
price_sender,
task_kill_receiver,
))
}

Expand All @@ -101,13 +107,18 @@ impl PriceTracker {
}
});

if let Err(error) = &result {
self.task_killer.send(error.to_string()).await?;
}

result
}
}

pub struct PriceTrackerDaemon {
file_store: FileStore,
price_sender: watch::Sender<Prices>,
task_killer: mpsc::Receiver<String>,
after: DateTime<Utc>,
}

Expand All @@ -124,6 +135,7 @@ impl PriceTrackerDaemon {
pub async fn new(
settings: &Settings,
price_sender: watch::Sender<Prices>,
task_killer: mpsc::Receiver<String>,
) -> anyhow::Result<Self> {
let file_store = FileStore::from_settings(&settings.file_store).await?;
let price_duration = settings.price_duration();
Expand All @@ -132,6 +144,7 @@ impl PriceTrackerDaemon {
Ok(Self {
file_store,
price_sender,
task_killer,
after: initial_timestamp,
})
}
Expand All @@ -151,6 +164,9 @@ impl PriceTrackerDaemon {
let timestamp = process_files(&self.file_store, &self.price_sender, self.after).await?;
self.after = timestamp.unwrap_or(self.after);
}
msg = self.task_killer.recv() => if let Some(error) = msg {
return Err(anyhow!(error));
}
}
}

Expand Down

0 comments on commit b78f22c

Please sign in to comment.