Skip to content

Commit

Permalink
Refactor validate to be simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Sep 4, 2024
1 parent 39170be commit ee32c20
Showing 1 changed file with 72 additions and 99 deletions.
171 changes: 72 additions & 99 deletions mobile_verifier/src/promotion_reward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use file_store::{
traits::{FileSinkWriteExt, TimestampEncode},
FileType,
};
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
mobile_config::NetworkKeyRole,
Expand All @@ -30,7 +30,6 @@ use sqlx::{postgres::PgRow, PgPool, Postgres, Row, Transaction};
use std::{
collections::HashMap,
ops::Range,
pin::pin,
time::{Duration, Instant},
};
use task_manager::{ManagedTask, TaskManager};
Expand Down Expand Up @@ -117,21 +116,27 @@ impl PromotionRewardDaemon {
tracing::info!("Processing promotion reward file {}", file.file_info.key);

let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
let mut promotion_rewards = file.into_stream(&mut transaction).await?;

let mut verified_promotion_rewards =
pin!(ValidatedPromotionReward::validate_promotion_rewards(
reports,
while let Some(promotion_reward) = promotion_rewards.next().await {
let promotion_reward_status = validate_promotion_reward(
&promotion_reward,
&self.authorization_verifier,
&self.gateway_info_resolver,
&self.entity_verifier
));
&self.entity_verifier,
)
.await?;

while let Some(promotion_reward) = verified_promotion_rewards.try_next().await? {
promotion_reward.write(&self.promotion_rewards_sink).await?;
if promotion_reward.is_valid() {
promotion_reward.save(&mut transaction).await?;
if promotion_reward_status == PromotionRewardStatus::Valid {
save_promotion_reward(&mut transaction, &promotion_reward).await?;
}

write_promotion_reward(
&self.promotion_rewards_sink,
&promotion_reward,
promotion_reward_status,
)
.await?;
}

self.promotion_rewards_sink.commit().await?;
Expand All @@ -155,11 +160,6 @@ impl ManagedTask for PromotionRewardDaemon {
}
}

pub struct ValidatedPromotionReward {
validity: PromotionRewardStatus,
promotion_reward: PromotionReward,
}

async fn validate_promotion_reward(
promotion_reward: &PromotionReward,
authorization_verifier: &impl AuthorizationVerifier,
Expand Down Expand Up @@ -197,93 +197,66 @@ async fn validate_promotion_reward(
}
}

impl ValidatedPromotionReward {
fn validate_promotion_rewards<'a>(
promotion_rewards: impl Stream<Item = PromotionReward> + 'a,
authorization_verifier: &'a impl AuthorizationVerifier,
gateway_info_resolver: &'a impl GatewayResolver,
entity_verifier: &'a impl EntityVerifier,
) -> impl Stream<Item = anyhow::Result<Self>> + 'a {
promotion_rewards.then(move |promotion_reward| async move {
async move {
let validity = validate_promotion_reward(
&promotion_reward,
authorization_verifier,
gateway_info_resolver,
entity_verifier,
)
.await?;
Ok(Self {
validity,
promotion_reward: promotion_reward,
})
}
.await
})
}

fn is_valid(&self) -> bool {
matches!(self.validity, PromotionRewardStatus::Valid)
}
async fn write_promotion_reward(
file_sink: &FileSinkClient<VerifiedPromotionRewardV1>,
promotion_reward: &PromotionReward,
status: PromotionRewardStatus,
) -> anyhow::Result<()> {
file_sink
.write(
VerifiedPromotionRewardV1 {
report: Some(PromotionRewardIngestReportV1 {
received_timestamp: promotion_reward
.received_timestamp
.encode_timestamp_millis(),
report: Some(promotion_reward.clone().into()),
}),
status: status as i32,
timestamp: Utc::now().encode_timestamp_millis(),
},
&[("validity", status.as_str_name())],
)
.await?;
Ok(())
}

async fn write(
&self,
promotion_rewards: &FileSinkClient<VerifiedPromotionRewardV1>,
) -> anyhow::Result<()> {
promotion_rewards
.write(
VerifiedPromotionRewardV1 {
report: Some(PromotionRewardIngestReportV1 {
received_timestamp: self
.promotion_reward
.received_timestamp
.encode_timestamp_millis(),
report: Some(self.promotion_reward.clone().into()),
}),
status: self.validity as i32,
timestamp: Utc::now().encode_timestamp_millis(),
},
&[("validity", self.validity.as_str_name())],
async fn save_promotion_reward(
transaction: &mut Transaction<'_, Postgres>,
promotion_reward: &PromotionReward,
) -> anyhow::Result<()> {
match &promotion_reward.entity {
Entity::SubscriberId(subscriber_id) => {
sqlx::query(
r#"
INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
"#
)
.bind(promotion_reward.timestamp)
.bind(subscriber_id)
.bind(&promotion_reward.carrier_pub_key)
.bind(promotion_reward.shares as i64)
.execute(&mut *transaction)
.await?;
}
Entity::GatewayKey(gateway_key) => {
sqlx::query(
r#"
INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
"#
)
.bind(promotion_reward.timestamp)
.bind(gateway_key)
.bind(&promotion_reward.carrier_pub_key)
.bind(promotion_reward.shares as i64)
.execute(&mut *transaction)
.await?;
Ok(())
}

async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
match self.promotion_reward.entity {
Entity::SubscriberId(subscriber_id) => {
sqlx::query(
r#"
INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
"#
)
.bind(self.promotion_reward.timestamp)
.bind(subscriber_id)
.bind(self.promotion_reward.carrier_pub_key)
.bind(self.promotion_reward.shares as i64)
.execute(&mut *transaction)
.await?;
}
Entity::GatewayKey(gateway_key) => {
sqlx::query(
r#"
INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
"#
)
.bind(self.promotion_reward.timestamp)
.bind(gateway_key)
.bind(self.promotion_reward.carrier_pub_key)
.bind(self.promotion_reward.shares as i64)
.execute(&mut *transaction)
.await?;
}
}
Ok(())
}
Ok(())
}

pub async fn clear_promotion_rewards(
Expand Down

0 comments on commit ee32c20

Please sign in to comment.