Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mobile Config] Add device_types filter to stream_gateways_info #880

Merged
merged 13 commits into from
Oct 28, 2024
58 changes: 37 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "mobile-config-filteres-hostpots-stream", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "mobile-config-filteres-hostpots-stream" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
14 changes: 12 additions & 2 deletions mobile_config/src/client/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use file_store::traits::MsgVerify;
use futures::stream::{self, StreamExt};
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
use helium_proto::{
services::{mobile_config, Channel},
services::{
mobile_config::{self, DeviceType},
Channel,
},
Message,
};
use retainer::Cache;
Expand Down Expand Up @@ -50,7 +53,10 @@ pub trait GatewayInfoResolver: Clone + Send + Sync + 'static {
address: &PublicKeyBinary,
) -> Result<Option<GatewayInfo>, Self::Error>;

async fn stream_gateways_info(&mut self) -> Result<GatewayInfoStream, Self::Error>;
async fn stream_gateways_info(
&mut self,
device_types: &[DeviceType],
) -> Result<GatewayInfoStream, Self::Error>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -92,11 +98,15 @@ impl GatewayInfoResolver for GatewayClient {
Ok(response)
}

/// Returns all gateways if device_types is empty
/// Otherwise, only selected device_types
async fn stream_gateways_info(
&mut self,
device_types: &[DeviceType],
) -> Result<gateway_info::GatewayInfoStream, Self::Error> {
let mut req = mobile_config::GatewayInfoStreamReqV1 {
batch_size: self.batch_size,
device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
Expand Down
33 changes: 29 additions & 4 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl From<DeviceTypeProto> for DeviceType {
#[error("invalid device type string")]
pub struct DeviceTypeParseError;

impl std::fmt::Display for DeviceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeviceType::Cbrs => write!(f, "cbrs"),
DeviceType::WifiIndoor => write!(f, "wifiIndoor"),
DeviceType::WifiOutdoor => write!(f, "wifiOutdoor"),
DeviceType::WifiDataOnly => write!(f, "wifiDataOnly"),
}
}
}
impl std::str::FromStr for DeviceType {
type Err = DeviceTypeParseError;

Expand Down Expand Up @@ -116,9 +126,11 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
}

pub async fn get_info(
Expand Down Expand Up @@ -153,11 +165,24 @@ pub(crate) mod db {

pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
) -> impl Stream<Item = GatewayInfo> + 'a {
sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed()
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(
device_types
.iter()
.map(|v| format!("\"{}\"", v))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the escaping here is required ? A note to explain why would be useful for future readers, especially given the table schema is managed outside of the oracles

Copy link
Member Author

@kurotych kurotych Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The device_types field has a jsonb type but is being used as a string, which forces us to add quotes.
image


image

Column in "meta" database:
image

(Added comment to code)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it is a better idea but you can query the field normally like this
WHERE mhi.device_type #>> '{}' = 'wifiDataOnly'

.collect::<Vec<_>>(),
)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
}
}

impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for GatewayInfo {
Expand Down
13 changes: 9 additions & 4 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
gateway_info::{self, GatewayInfo},
gateway_info::{self, DeviceType, GatewayInfo},
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
Expand Down Expand Up @@ -156,16 +156,21 @@ impl mobile_config::Gateway for GatewayService {
let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request)?;

tracing::debug!("fetching all gateways' info");

let pool = self.metadata_pool.clone();
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;

let (tx, rx) = tokio::sync::mpsc::channel(100);

let device_types: Vec<DeviceType> = request.device_types().map(|v| v.into()).collect();

tracing::debug!(
"fetching all gateways' info. Device types: {:?} ",
device_types
);

tokio::spawn(async move {
let stream = gateway_info::db::all_info_stream(&pool);
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down
Loading