Skip to content

Commit

Permalink
Add PriceTracker to Price Oracle (#403)
Browse files Browse the repository at this point in the history
* Add PriceTracker to file_store

Update price tracker and have it fail it unable to get initial prices

appease clippy

Update PriceTracker to be in price oracle

Remove unused Error

Refactor to use more streaming

* Fix clippy warning

* Renamed get_initial_prices to calculate_initial_prices

* Update price/src/price_tracker.rs

Co-authored-by: Matthew Plant <matty@nova-labs.com>

---------

Co-authored-by: Matthew Plant <matty@nova-labs.com>
  • Loading branch information
bbalser and Matthew Plant authored Mar 13, 2023
1 parent ad47968 commit bdcbd5f
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
2 changes: 1 addition & 1 deletion iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl iot_config::Gateway for GatewayService {
.map_err(|_| Status::internal(format!("error retrieving gateway {gateway_address}")))
.and_then(|info| {
info.location
.ok_or(Status::not_found(format!("{gateway_address} not asserted")))
.ok_or_else(|| Status::not_found(format!("{gateway_address} not asserted")))
})?;

let location = Cell::from_raw(location)
Expand Down
2 changes: 2 additions & 0 deletions price/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod error;
pub mod metrics;
pub mod price_generator;
mod price_tracker;
pub mod settings;

pub use error::PriceError;
pub use price_generator::PriceGenerator;
pub use price_tracker::PriceTracker;
pub use settings::Settings;
133 changes: 133 additions & 0 deletions price/src/price_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use anyhow::{anyhow, Error, Result};
use chrono::{DateTime, Duration, Utc};
use file_store::{FileInfo, FileStore, FileType, Settings};
use futures::stream::{StreamExt, TryStreamExt};
use helium_proto::{BlockchainTokenTypeV1, Message, PriceReportV1};
use std::collections::HashMap;
use tokio::sync::watch;

type Prices = HashMap<BlockchainTokenTypeV1, u64>;

#[derive(Clone)]
pub struct PriceTracker {
receiver: watch::Receiver<Prices>,
}

impl PriceTracker {
pub async fn start(
settings: &Settings,
shutdown: triggered::Listener,
) -> Result<(Self, impl std::future::Future<Output = Result<()>>)> {
let file_store = FileStore::from_settings(settings).await?;
let (sender, receiver) = watch::channel(Prices::new());
let initial_timestamp = calculate_initial_prices(&file_store, &sender).await?;

let handle =
tokio::spawn(async move { run(file_store, sender, initial_timestamp, shutdown).await });

Ok((PriceTracker { receiver }, async move {
match handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
}
}))
}

pub async fn price(&self, token_type: &BlockchainTokenTypeV1) -> Result<u64> {
self.receiver
.borrow()
.get(token_type)
.cloned()
.ok_or_else(|| anyhow!("price not available"))
}
}

async fn run(
file_store: FileStore,
sender: watch::Sender<Prices>,
mut after: DateTime<Utc>,
shutdown: triggered::Listener,
) -> Result<()> {
let mut trigger = tokio::time::interval(std::time::Duration::from_secs(30));

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => {
tracing::info!("PriceTracker: shutting down");
break;
}
_ = trigger.tick() => {
let timestamp = process_files(&file_store, &sender, after).await?;
after = timestamp.unwrap_or(after);
}
}
}

Ok(())
}

async fn calculate_initial_prices(
file_store: &FileStore,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
tracing::debug!("PriceTracker: Updating initial prices");
for duration in [Duration::minutes(10), Duration::hours(4)] {
let timestamp = process_files(file_store, sender, Utc::now() - duration).await?;

if timestamp.is_some() {
return Ok(timestamp.unwrap());
}
}

Err(anyhow!("price not available"))
}

async fn process_files(
file_store: &FileStore,
sender: &watch::Sender<Prices>,
after: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>> {
file_store
.list(FileType::PriceReport, after, None)
.map_err(Error::from)
.and_then(|file| process_file(file_store, file, sender))
.try_fold(None, |_old, ts| async move { Ok(Some(ts)) })
.await
}

async fn process_file(
file_store: &FileStore,
file: FileInfo,
sender: &watch::Sender<Prices>,
) -> Result<DateTime<Utc>> {
tracing::debug!("PriceTracker: processing pricing report file {}", file.key);
let timestamp = file.timestamp;

file_store
.stream_file(file)
.await?
.map_err(Error::from)
.and_then(|buf| async { PriceReportV1::decode(buf).map_err(Error::from) })
.map_err(|err| {
tracing::warn!("PriceTracker: skipping price report due to error {err:?}");
err
})
.filter_map(|result| async { result.ok() })
.for_each(|report| async move {
sender.send_if_modified(
|prices: &mut Prices| match prices.get(&report.token_type()) {
Some(price) if price == &report.price => false,
_ => {
prices.insert(report.token_type(), report.price);
true
}
},
);
})
.await;

Ok(timestamp)
}

0 comments on commit bdcbd5f

Please sign in to comment.