From b78f22c68f0798830123e5575594a8e2659432df Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Thu, 3 Aug 2023 17:23:40 +0100 Subject: [PATCH] put task killer back in --- iot_verifier/src/main.rs | 8 +++++--- mobile_verifier/src/cli/server.rs | 8 +++++--- price/src/price_tracker.rs | 20 ++++++++++++++++++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 1b057bd77..0aef98aff 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -88,9 +88,11 @@ impl Server { // * // setup the price tracker requirements // * - let (price_tracker, price_sender) = PriceTracker::new(&settings.price_tracker).await?; - - let price_daemon = PriceTrackerDaemon::new(&settings.price_tracker, price_sender).await?; + let (price_tracker, price_sender, task_killer_receiver) = + PriceTracker::new(&settings.price_tracker).await?; + let price_daemon = + PriceTrackerDaemon::new(&settings.price_tracker, price_sender, task_killer_receiver) + .await?; // * // setup the loader requirements diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 340cac537..7abd0983d 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -44,9 +44,11 @@ impl Cmd { // * // setup the price tracker requirements // * - let (price_tracker, price_sender) = PriceTracker::new(&settings.price_tracker).await?; - - let price_daemon = PriceTrackerDaemon::new(&settings.price_tracker, price_sender).await?; + let (price_tracker, price_sender, task_killer_receiver) = + PriceTracker::new(&settings.price_tracker).await?; + let price_daemon = + PriceTrackerDaemon::new(&settings.price_tracker, price_sender, task_killer_receiver) + .await?; // Heartbeats let (heartbeats, heartbeats_ingest_server) = diff --git a/price/src/price_tracker.rs b/price/src/price_tracker.rs index fd3adabd0..2efd58d3e 100644 --- a/price/src/price_tracker.rs +++ b/price/src/price_tracker.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use chrono::{DateTime, Duration, TimeZone, Utc}; use file_store::{FileInfo, FileStore, FileType}; use futures::{ @@ -69,18 +70,23 @@ impl Settings { pub struct PriceTracker { price_duration: Duration, price_receiver: watch::Receiver, + task_killer: mpsc::Sender, } impl PriceTracker { - pub async fn new(settings: &Settings) -> anyhow::Result<(Self, watch::Sender)> { + pub async fn new( + settings: &Settings, + ) -> anyhow::Result<(Self, watch::Sender, mpsc::Receiver)> { let (price_sender, price_receiver) = watch::channel(Prices::new()); - + let (task_kill_sender, task_kill_receiver) = mpsc::channel(1); Ok(( Self { price_duration: settings.price_duration(), price_receiver, + task_killer: task_kill_sender, }, price_sender, + task_kill_receiver, )) } @@ -101,6 +107,10 @@ impl PriceTracker { } }); + if let Err(error) = &result { + self.task_killer.send(error.to_string()).await?; + } + result } } @@ -108,6 +118,7 @@ impl PriceTracker { pub struct PriceTrackerDaemon { file_store: FileStore, price_sender: watch::Sender, + task_killer: mpsc::Receiver, after: DateTime, } @@ -124,6 +135,7 @@ impl PriceTrackerDaemon { pub async fn new( settings: &Settings, price_sender: watch::Sender, + task_killer: mpsc::Receiver, ) -> anyhow::Result { let file_store = FileStore::from_settings(&settings.file_store).await?; let price_duration = settings.price_duration(); @@ -132,6 +144,7 @@ impl PriceTrackerDaemon { Ok(Self { file_store, price_sender, + task_killer, after: initial_timestamp, }) } @@ -151,6 +164,9 @@ impl PriceTrackerDaemon { let timestamp = process_files(&self.file_store, &self.price_sender, self.after).await?; self.after = timestamp.unwrap_or(self.after); } + msg = self.task_killer.recv() => if let Some(error) = msg { + return Err(anyhow!(error)); + } } }