Skip to content

Commit

Permalink
Move things around
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed May 15, 2024
1 parent bf83fb4 commit 198a838
Showing 1 changed file with 166 additions and 106 deletions.
272 changes: 166 additions & 106 deletions mobile_verifier/src/boosting_oracles/data_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,12 @@ where
new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some();
if self.data_sets.is_ready() && new_data_set {
tracing::info!("Processing new data sets");
let boosting_reports = set_oracle_boosting_assignments(
UnassignedHex::fetch_all(&self.pool),
&self.data_sets,
set_all_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.oracle_boosting_sink
.write_all(boosting_reports)
.await?;
}

// Mark the new data sets as processed and delete the old ones
Expand Down Expand Up @@ -324,15 +321,12 @@ where
// Attempt to fill in any unassigned hexes. This is for the edge case in
// which we shutdown before a coverage object updates.
if self.data_sets.is_ready() {
let boosting_reports = set_oracle_boosting_assignments(
UnassignedHex::fetch_unassigned(&self.pool),
&self.data_sets,
set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.oracle_boosting_sink
.write_all(boosting_reports)
.await?;
}

loop {
Expand All @@ -342,15 +336,11 @@ where
// If we see a new coverage object, we want to assign only those hexes
// that don't have an assignment
if self.data_sets.is_ready() {
let boosting_reports = set_oracle_boosting_assignments(
UnassignedHex::fetch_unassigned(&self.pool),
&self.data_sets,
set_unassigned_oracle_boosting_assignments(
&self.pool,
)
.await?;
self.oracle_boosting_sink
.write_all(boosting_reports)
.await?;
&self.data_sets,
&self.oracle_boosting_sink
).await?;
}
},
_ = tokio::time::sleep(poll_duration.to_std()?) => {
Expand Down Expand Up @@ -560,113 +550,183 @@ pub mod db {
}
}

#[derive(FromRow)]
pub struct UnassignedHex {
uuid: uuid::Uuid,
#[sqlx(try_from = "i64")]
hex: u64,
signal_level: SignalLevel,
signal_power: i32,
pub struct AssignedCoverageObjects {
coverage_objs: HashMap<uuid::Uuid, Vec<AssignedHex>>,
}

impl UnassignedHex {
pub fn fetch_all(pool: &PgPool) -> impl Stream<Item = sqlx::Result<Self>> + '_ {
sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool)
impl AssignedCoverageObjects {
async fn from_stream(
stream: impl Stream<Item = sqlx::Result<UnassignedHex>>,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<Self> {
let mut coverage_objs = HashMap::<uuid::Uuid, Vec<AssignedHex>>::new();
let mut stream = pin!(stream);
while let Some(hex) = stream.try_next().await? {
let hex = hex.assign(data_sets)?;
coverage_objs.entry(hex.uuid).or_default().push(hex);
}
Ok(Self { coverage_objs })
}

pub fn fetch_unassigned(pool: &PgPool) -> impl Stream<Item = sqlx::Result<Self>> + '_ {
sqlx::query_as(
"SELECT
async fn fetch_all(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<Self> {
Self::from_stream(
sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool),
data_sets,
)
.await
}

async fn fetch_unassigned(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<Self> {
Self::from_stream(
sqlx::query_as(
"SELECT
uuid, hex, signal_level, signal_power
FROM
hexes
WHERE
urbanized IS NULL
OR footfall IS NULL
OR landtype IS NULL",
)
.fetch(pool),
data_sets,
)
.fetch(pool)
.await
}
}

pub async fn set_oracle_boosting_assignments<'a>(
unassigned_hexes: impl Stream<Item = sqlx::Result<UnassignedHex>>,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
pool: &'a PgPool,
) -> anyhow::Result<impl Iterator<Item = proto::OracleBoostingReportV1>> {
const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7;
const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize;

let now = Utc::now();
let mut boost_results = HashMap::<uuid::Uuid, Vec<proto::OracleBoostingHexAssignment>>::new();
let mut unassigned_hexes = pin!(unassigned_hexes.try_chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES));

while let Some(hexes) = unassigned_hexes.try_next().await? {
let hexes: anyhow::Result<Vec<_>> = hexes
.into_iter()
.map(|hex| {
let cell = hextree::Cell::try_from(hex.hex)?;
let assignments = HexAssignments::builder(cell)
.footfall(&data_sets.footfall)
.landtype(&data_sets.landtype)
.urbanized(&data_sets.urbanization)
.build()?;
let location = format!("{:x}", hex.hex);
let assignment_multiplier = (assignments.boosting_multiplier() * dec!(1000))
async fn write(&self, boosting_reports: &FileSinkClient) -> file_store::Result {
let timestamp = Utc::now().encode_timestamp();
for (uuid, hexes) in self.coverage_objs.iter() {
let assignments: Vec<_> = hexes
.iter()
.map(|hex| {
let location = format!("{:x}", hex.hex);
let assignment_multiplier = (hex.assignments.boosting_multiplier()
* dec!(1000))
.to_u32()
.unwrap_or(0);

boost_results.entry(hex.uuid).or_default().push(
proto::OracleBoostingHexAssignment {
location,
urbanized: assignments.urbanized.into(),
footfall: assignments.footfall.into(),
landtype: assignments.landtype.into(),
urbanized: hex.assignments.urbanized.into(),
footfall: hex.assignments.footfall.into(),
landtype: hex.assignments.landtype.into(),
assignment_multiplier,
}
})
.collect();
boosting_reports
.write(
proto::OracleBoostingReportV1 {
coverage_object: Vec::from(uuid.into_bytes()),
assignments,
timestamp,
},
);

Ok((
hex,
assignments.footfall,
assignments.landtype,
assignments.urbanized,
))
})
.collect();

QueryBuilder::new(
"INSERT INTO hexes (uuid, hex, signal_level, signal_power, footfall, landtype, urbanized)",
)
.push_values(hexes?, |mut b, (hex, footfall, landtype, urbanized)| {
b.push_bind(hex.uuid)
.push_bind(hex.hex as i64)
.push_bind(hex.signal_level)
.push_bind(hex.signal_power)
.push_bind(footfall)
.push_bind(landtype)
.push_bind(urbanized);
&[],
)
.await?;
}

Ok(())
}

async fn save(self, pool: &PgPool) -> anyhow::Result<()> {
const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7;
const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize =
(u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize;

let assigned_hexes: Vec<_> = self.coverage_objs.into_values().flatten().collect();
for assigned_hexes in assigned_hexes.chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES) {
QueryBuilder::new(
"INSERT INTO hexes (uuid, hex, signal_level, signal_power, footfall, landtype, urbanized)",
)
.push_values(assigned_hexes, |mut b, hex| {
b.push_bind(hex.uuid)
.push_bind(hex.hex as i64)
.push_bind(hex.signal_level)
.push_bind(hex.signal_power)
.push_bind(hex.assignments.footfall)
.push_bind(hex.assignments.landtype)
.push_bind(hex.assignments.urbanized);
})
.push(
r#"
ON CONFLICT (uuid, hex) DO UPDATE SET
footfall = EXCLUDED.footfall,
landtype = EXCLUDED.landtype,
urbanized = EXCLUDED.urbanized
"#,
)
.build()
.execute(pool)
.await?;
}

Ok(())
}
}

#[derive(FromRow)]
pub struct UnassignedHex {
uuid: uuid::Uuid,
#[sqlx(try_from = "i64")]
hex: u64,
signal_level: SignalLevel,
signal_power: i32,
}

impl UnassignedHex {
fn assign(
self,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<AssignedHex> {
let cell = hextree::Cell::try_from(self.hex)?;
let assignments = HexAssignments::builder(cell)
.footfall(&data_sets.footfall)
.landtype(&data_sets.landtype)
.urbanized(&data_sets.urbanization)
.build()?;
Ok(AssignedHex {
uuid: self.uuid,
hex: self.hex,
signal_level: self.signal_level,
signal_power: self.signal_power,
assignments,
})
.push(
r#"
ON CONFLICT (uuid, hex) DO UPDATE SET
footfall = EXCLUDED.footfall,
landtype = EXCLUDED.landtype,
urbanized = EXCLUDED.urbanized
"#,
)
.build()
.execute(pool)
.await?;
}
}

pub struct AssignedHex {
uuid: uuid::Uuid,
hex: u64,
signal_level: SignalLevel,
signal_power: i32,
assignments: HexAssignments,
}

pub async fn set_all_oracle_boosting_assignments(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
file_sink: &FileSinkClient,
) -> anyhow::Result<()> {
let assigned_coverage_objs = AssignedCoverageObjects::fetch_all(pool, data_sets).await?;
assigned_coverage_objs.write(file_sink).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

Ok(boost_results
.into_iter()
.map(
move |(coverage_object, assignments)| proto::OracleBoostingReportV1 {
coverage_object: Vec::from(coverage_object.into_bytes()),
assignments,
timestamp: now.encode_timestamp(),
},
))
pub async fn set_unassigned_oracle_boosting_assignments(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
file_sink: &FileSinkClient,
) -> anyhow::Result<()> {
let assigned_coverage_objs = AssignedCoverageObjects::fetch_unassigned(pool, data_sets).await?;
assigned_coverage_objs.write(file_sink).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

0 comments on commit 198a838

Please sign in to comment.