Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mobile Config] Add device_types filter to stream_gateways_info #880

Merged
merged 13 commits into from
Oct 28, 2024
58 changes: 37 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "mobile-config-filteres-hostpots-stream", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "mobile-config-filteres-hostpots-stream" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
14 changes: 12 additions & 2 deletions mobile_config/src/client/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use file_store::traits::MsgVerify;
use futures::stream::{self, StreamExt};
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
use helium_proto::{
services::{mobile_config, Channel},
services::{
mobile_config::{self, DeviceType},
Channel,
},
Message,
};
use retainer::Cache;
Expand Down Expand Up @@ -50,7 +53,10 @@ pub trait GatewayInfoResolver: Clone + Send + Sync + 'static {
address: &PublicKeyBinary,
) -> Result<Option<GatewayInfo>, Self::Error>;

async fn stream_gateways_info(&mut self) -> Result<GatewayInfoStream, Self::Error>;
async fn stream_gateways_info(
&mut self,
device_types: &[DeviceType],
) -> Result<GatewayInfoStream, Self::Error>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -92,11 +98,15 @@ impl GatewayInfoResolver for GatewayClient {
Ok(response)
}

/// Returns all gateways if device_types is empty
/// Otherwise, only selected device_types
async fn stream_gateways_info(
&mut self,
device_types: &[DeviceType],
) -> Result<gateway_info::GatewayInfoStream, Self::Error> {
let mut req = mobile_config::GatewayInfoStreamReqV1 {
batch_size: self.batch_size,
device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
Expand Down
34 changes: 30 additions & 4 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ pub enum DeviceType {
WifiDataOnly,
}

impl DeviceType {
fn to_sql_argument(&self) -> String {
match self {
DeviceType::Cbrs => "\"cbrs\"".to_string(),
DeviceType::WifiIndoor => "\"wifiIndoor\"".to_string(),
DeviceType::WifiOutdoor => "\"wifiOutdoor\"".to_string(),
DeviceType::WifiDataOnly => "\"wifiDataOnly\"".to_string(),
}
}
}
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved

impl From<DeviceTypeProto> for DeviceType {
fn from(dtp: DeviceTypeProto) -> Self {
match dtp {
Expand Down Expand Up @@ -116,9 +127,11 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
}

pub async fn get_info(
Expand Down Expand Up @@ -153,11 +166,24 @@ pub(crate) mod db {

pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
) -> impl Stream<Item = GatewayInfo> + 'a {
sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed()
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(
device_types
.iter()
.map(|v| v.to_sql_argument())
.collect::<Vec<_>>(),
)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
}
}

impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for GatewayInfo {
Expand Down
11 changes: 8 additions & 3 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
gateway_info::{self, GatewayInfo},
gateway_info::{self, DeviceType, GatewayInfo},
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
Expand Down Expand Up @@ -156,16 +156,21 @@ impl mobile_config::Gateway for GatewayService {
let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request)?;

tracing::debug!("fetching all gateways' info");
tracing::debug!(
"fetching all gateways' info. Device types: {:?} ",
request.device_types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this debug statement come after the map below? it would debug to a vec of i32s before that conversion, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed

);

let pool = self.metadata_pool.clone();
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;

let (tx, rx) = tokio::sync::mpsc::channel(100);

let device_types: Vec<DeviceType> = request.device_types().map(|v| v.into()).collect();

tokio::spawn(async move {
let stream = gateway_info::db::all_info_stream(&pool);
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down
Loading