Skip to content

Commit

Permalink
add taskman app, apply usage to each app
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jul 20, 2023
1 parent c5724ca commit 7ee54ef
Show file tree
Hide file tree
Showing 63 changed files with 1,691 additions and 1,508 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"reward_index",
"reward_scheduler",
"solana",
"task_manager",
]

[workspace.package]
Expand Down Expand Up @@ -99,6 +100,7 @@ itertools = "*"
data-credits = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"}
helium-sub-daos = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"}
price-oracle = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"}
tokio-util = "0"

[patch.crates-io]
sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" }
27 changes: 4 additions & 23 deletions db_store/src/iam_auth_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use aws_types::{
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub async fn connect(
settings: &Settings,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
pub async fn connect(settings: &Settings) -> Result<Pool<Postgres>> {
let aws_config = aws_config::load_from_env().await;
let client = aws_sdk_sts::Client::new(&aws_config);
let connect_parameters = ConnectParameters::try_from(settings)?;
Expand All @@ -28,43 +25,27 @@ pub async fn connect(
.await?;

let cloned_pool = pool.clone();
let join_handle =
tokio::spawn(async move { run(client, connect_parameters, cloned_pool, shutdown).await });

Ok((
pool,
Box::pin(async move {
match join_handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
}
}),
))
tokio::spawn(async move { run(client, connect_parameters, cloned_pool).await });

Ok(pool)
}

async fn run(
client: aws_sdk_sts::Client,
connect_parameters: ConnectParameters,
pool: Pool<Postgres>,
shutdown: triggered::Listener,
) -> Result {
let duration = std::time::Duration::from_secs(connect_parameters.iam_duration_seconds as u64)
- Duration::from_secs(120);

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => break,
_ = tokio::time::sleep(duration) => {
let connect_options = connect_parameters.connect_options(&client).await?;
pool.set_connect_options(connect_options);
}
}
}

Ok(())
}

struct ConnectParameters {
Expand Down
39 changes: 6 additions & 33 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,20 @@
use std::time::Duration;

use crate::{Error, Result};

const DURATION: Duration = Duration::from_secs(300);

pub async fn start(
app_name: &str,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) -> Result<futures::future::BoxFuture<'static, Result>> {
pub async fn start(app_name: &str, pool: sqlx::Pool<sqlx::Postgres>) {
let pool_size_name = format!("{app_name}_db_pool_size");
let pool_idle_name = format!("{app_name}_db_pool_idle");
let join_handle =
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await });

Ok(Box::pin(async move {
match join_handle.await {
Ok(()) => Ok(()),
Err(err) => Err(Error::from(err)),
}
}))
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool).await });
}

async fn run(
size_name: String,
idle_name: String,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) {
async fn run(size_name: String, idle_name: String, pool: sqlx::Pool<sqlx::Postgres>) {
let mut trigger = tokio::time::interval(DURATION);

loop {
let shutdown = shutdown.clone();
trigger.tick().await;

tokio::select! {
_ = shutdown => {
tracing::info!("db_store: MetricTracker shutting down");
break;
}
_ = trigger.tick() => {
metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
}
}
metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
}
}
34 changes: 8 additions & 26 deletions db_store/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,19 @@ fn default_auth_type() -> AuthType {
}

impl Settings {
pub async fn connect(
&self,
app_name: &str,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
pub async fn connect(&self, app_name: &str) -> Result<Pool<Postgres>> {
match self.auth_type {
AuthType::Postgres => match self.simple_connect().await {
Ok(pool) => Ok((
pool.clone(),
metric_tracker::start(app_name, pool, shutdown).await?,
)),
Ok(pool) => {
metric_tracker::start(app_name, pool.clone()).await;
Ok(pool)
}
Err(err) => Err(err),
},
AuthType::Iam => {
let (pool, iam_auth_handle) =
iam_auth_pool::connect(self, shutdown.clone()).await?;
let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?;

let handle =
tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) });

Ok((
pool,
Box::pin(async move {
match handle.await {
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Ok(_) => Ok(()),
}
}),
))
let pool = iam_auth_pool::connect(self).await?;
metric_tracker::start(app_name, pool.clone()).await;
Ok(pool)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion file_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ authors.workspace = true
license.workspace = true

[dependencies]
anyhow = {workspace = true}
clap = {workspace = true}
config = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
thiserror = {workspace = true}
tokio = { workspace = true }
tokio-util = "0"
tokio-util = { workspace = true }
tokio-stream = {workspace = true}
triggered = {workspace = true}
async-compression = {version = "0", features = ["tokio", "gzip"]}
Expand Down Expand Up @@ -46,6 +47,7 @@ sqlx = {workspace = true}
async-trait = {workspace = true}
derive_builder = "0"
retainer = {workspace = true}
task-manager = { path = "../task_manager" }

[dev-dependencies]
hex-literal = "0"
Expand Down
2 changes: 2 additions & 0 deletions file_store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub enum Error {
SendTimeout,
#[error("shutting down")]
Shutdown,
#[error("error building file info poller")]
FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError),
}

#[derive(Error, Debug)]
Expand Down
Loading

0 comments on commit 7ee54ef

Please sign in to comment.