Skip to content

Commit

Permalink
CMove to insert or update model for seniority
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed Jul 19, 2023
1 parent fee3eb3 commit e35c82b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 39 deletions.
29 changes: 18 additions & 11 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,6 @@ pub struct CoverageReward {

pub const MAX_RADIOS_PER_HEX: usize = 5;

#[derive(Default)]
pub struct CoveredHexes {
hexes: HashMap<CellIndex, [BTreeMap<SignalLevel, BinaryHeap<CoverageLevel>>; 2]>,
}

#[async_trait::async_trait]
pub trait CoveredHexStream {
async fn covered_hex_stream<'a>(
Expand All @@ -258,18 +253,25 @@ impl CoveredHexStream for Pool<Postgres> {
// Adjust the coverage
let seniority: Seniority = sqlx::query_as(
r#"
SELECT * FROM seniority WHERE cbsd_id = $1 AND last_heartbeat <= $2 ORDER BY last_heartbeat DESC
SELECT * FROM seniority
WHERE
cbsd_id = $1 AND
inserted_at <= $2
ORDER BY inserted_at DESC
LIMIT 1
"#,
)
.bind(cbsd_id)
.bind(period_end)
.bind(cbsd_id)
.bind(period_end)
.fetch_one(self)
.await?;
.await?;

// We can safely delete any seniority objects that appear before the latest in the reward period
sqlx::query("DELETE FROM seniority WHERE last_heartbeat < $1")
.bind(seniority.last_heartbeat)
sqlx::query("DELETE FROM seniority WHERE inserted_at < $1")
.bind(seniority.inserted_at)
.execute(self)
.await?;

Ok(
sqlx::query_as("SELECT * FROM hex_coverage WHERE cbsd_id = $1 AND uuid = $2")
.bind(cbsd_id)
Expand All @@ -284,6 +286,11 @@ impl CoveredHexStream for Pool<Postgres> {
}
}

#[derive(Default)]
pub struct CoveredHexes {
hexes: HashMap<CellIndex, [BTreeMap<SignalLevel, BinaryHeap<CoverageLevel>>; 2]>,
}

impl CoveredHexes {
pub async fn aggregate_coverage<E>(
&mut self,
Expand Down
83 changes: 55 additions & 28 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub struct Seniority {
pub uuid: Uuid,
pub seniority_ts: DateTime<Utc>,
pub last_heartbeat: DateTime<Utc>,
pub inserted_at: DateTime<Utc>,
}

impl Heartbeat {
Expand Down Expand Up @@ -347,6 +348,11 @@ impl Heartbeat {
coverage_claim_time
};

enum InsertOrUpdate {
Insert(proto::SeniorityUpdateReason),
Update(DateTime<Utc>),
}

let (seniority_ts, update_reason) = if let Some(prev_seniority) =
sqlx::query_as::<_, Seniority>(
"SELECT * FROM seniority WHERE cbsd_id = $1 ORDER BY last_heartbeat DESC",
Expand All @@ -358,51 +364,72 @@ impl Heartbeat {
if self.coverage_object != Some(prev_seniority.uuid) {
(
coverage_claim_time,
Some(proto::SeniorityUpdateReason::NewCoverageClaimTime),
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime),
)
} else if self.timestamp - prev_seniority.last_heartbeat > Duration::days(3)
&& coverage_claim_time < self.timestamp
{
(
self.timestamp,
Some(proto::SeniorityUpdateReason::HeartbeatNotSeen),
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::HeartbeatNotSeen),
)
} else {
(coverage_claim_time, None)
(
coverage_claim_time,
InsertOrUpdate::Update(prev_seniority.seniority_ts),
)
}
} else {
(
coverage_claim_time,
Some(proto::SeniorityUpdateReason::NewCoverageClaimTime),
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime),
)
};

sqlx::query(
r#"
INSERT INTO seniority
(cbsd_id, last_heartbeat, uuid, seniority_ts)
VALUES
($1, $2, $3, $4)
"#,
)
.bind(&self.cbsd_id)
.bind(self.timestamp)
.bind(self.coverage_object)
.bind(seniority_ts)
.execute(&mut *exec)
.await?;

if let Some(update_reason) = update_reason {
seniorities
.write(
proto::SeniorityUpdate {
cbsd_id: self.cbsd_id.to_string(),
new_seniority_timestamp: seniority_ts.timestamp() as u64,
reason: update_reason as i32,
},
[],
match update_reason {
InsertOrUpdate::Insert(update_reason) => {
sqlx::query(
r#"
INSERT INTO seniority
(cbsd_id, last_heartbeat, uuid, seniority_ts, inserted_at)
VALUES
($1, $2, $3, $4, $5)
"#,
)
.bind(&self.cbsd_id)
.bind(self.timestamp)
.bind(self.coverage_object)
.bind(seniority_ts)
.bind(self.timestamp)
.execute(&mut *exec)
.await?;
seniorities
.write(
proto::SeniorityUpdate {
cbsd_id: self.cbsd_id.to_string(),
new_seniority_timestamp: seniority_ts.timestamp() as u64,
reason: update_reason as i32,
},
[],
)
.await?;
}
InsertOrUpdate::Update(seniority_ts) => {
sqlx::query(
r#"
UPDATE seniority
SET last_heartbeat = $1
WHERE
cbsd_id = $2 AND
seniority_ts = $3
"#,
)
.bind(self.timestamp)
.bind(&self.cbsd_id)
.bind(seniority_ts)
.execute(&mut *exec)
.await?;
}
}

Ok(())
Expand Down

0 comments on commit e35c82b

Please sign in to comment.