Skip to content

Commit

Permalink
Use abci_query instead of gRPC queries during health-check (#4102)
Browse files Browse the repository at this point in the history
* Use Abci query instead of gRPC queries during health-check

* Add changelog entry
  • Loading branch information
ljoss17 authored Aug 26, 2024
1 parent 7106351 commit 4b97bfa
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Use `abci_query` instead of gRPC queries when retrieving staking params
and service config during health-check, and when retrieving version information.
([\#4102](https://github.com/informalsystems/hermes/issues/4102))
155 changes: 43 additions & 112 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use alloc::sync::Arc;
use bytes::Buf;
use bytes::Bytes;
use core::{future::Future, str::FromStr, time::Duration};
use query::connection::query_connection_params;
use std::{cmp::Ordering, thread};

use bytes::{Buf, Bytes};
use futures::future::join_all;
use num_bigint::BigInt;
use prost::Message;
use std::cmp::Ordering;
use std::thread;
use tokio::runtime::Runtime as TokioRuntime;
use tonic::codegen::http::Uri;
use tonic::metadata::AsciiMetadataValue;
use tracing::{debug, error, instrument, trace, warn};

use ibc_proto::cosmos::base::node::v1beta1::ConfigResponse;
use ibc_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient;
use ibc_proto::cosmos::base::tendermint::v1beta1::{GetSyncingRequest, GetSyncingResponse};
use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams;
use ibc_proto::cosmos::staking::v1beta1::{Params as StakingParams, QueryParamsResponse};
use ibc_proto::ibc::apps::fee::v1::{
QueryIncentivizedPacketRequest, QueryIncentivizedPacketResponse,
};
Expand Down Expand Up @@ -76,6 +75,7 @@ use crate::chain::cosmos::fee::maybe_register_counterparty_payee;
use crate::chain::cosmos::gas::{calculate_fee, mul_ceil};
use crate::chain::cosmos::query::account::get_or_fetch_account;
use crate::chain::cosmos::query::balance::{query_all_balances, query_balance};
use crate::chain::cosmos::query::connection::query_connection_params;
use crate::chain::cosmos::query::consensus_state::query_consensus_state_heights;
use crate::chain::cosmos::query::custom::cross_chain_query_via_rpc;
use crate::chain::cosmos::query::denom_trace::query_denom_trace;
Expand Down Expand Up @@ -411,26 +411,20 @@ impl CosmosSdkChain {
);
crate::telemetry!(query, self.id(), "query_staking_params");

let mut client = self
.block_on(
ibc_proto::cosmos::staking::v1beta1::query_client::QueryClient::connect(
self.grpc_addr.clone(),
),
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request =
tonic::Request::new(ibc_proto::cosmos::staking::v1beta1::QueryParamsRequest {});

let response = self
.block_on(client.params(request))
.map_err(|e| Error::grpc_status(e, "query_staking_params".to_owned()))?;
let query_response = self.block_on(abci_query(
&self.rpc_client,
&self.config().rpc_addr,
"/cosmos.staking.v1beta1.Query/Params".to_owned(),
"".to_owned(),
QueryHeight::Latest.into(),
false,
))?;
let params_response =
QueryParamsResponse::decode(query_response.value.as_ref()).map_err(|e| {
Error::protobuf_decode("cosmos.staking.v1beta1.Query/Params".to_owned(), e)
})?;

let params = response
.into_inner()
let params = params_response
.params
.ok_or_else(|| Error::grpc_response_param("no staking params".to_string()))?;

Expand All @@ -454,45 +448,20 @@ impl CosmosSdkChain {
);
crate::telemetry!(query, self.id(), "query_config_params");

// Helper function to diagnose if the node config query is unimplemented
// by matching on the error details.
fn is_unimplemented_node_query(err_status: &tonic::Status) -> bool {
if err_status.code() != tonic::Code::Unimplemented {
return false;
}

err_status
.message()
.contains("unknown service cosmos.base.node.v1beta1.Service")
}

let mut client = self
.block_on(
ibc_proto::cosmos::base::node::v1beta1::service_client::ServiceClient::connect(
self.grpc_addr.clone(),
),
)
.map_err(Error::grpc_transport)?;

client = client
.max_decoding_message_size(self.config().max_grpc_decoding_size.get_bytes() as usize);

let request = tonic::Request::new(ibc_proto::cosmos::base::node::v1beta1::ConfigRequest {});

match self.block_on(client.config(request)) {
Ok(response) => {
let params = response.into_inner();
let query_response = self.block_on(abci_query(
&self.rpc_client,
&self.config().rpc_addr,
"/cosmos.base.node.v1beta1.Service/Config".to_owned(),
"".to_owned(),
QueryHeight::Latest.into(),
false,
))?;
let config_response =
ConfigResponse::decode(query_response.value.as_ref()).map_err(|e| {
Error::protobuf_decode("cosmos.base.node.v1beta1.Service/Config".to_owned(), e)
})?;

Ok(Some(params))
}
Err(e) => {
if is_unimplemented_node_query(&e) {
Ok(None)
} else {
Err(Error::grpc_status(e, "query_config_params".to_owned()))
}
}
}
Ok(Some(config_response))
}

/// The minimum gas price that this node accepts
Expand Down Expand Up @@ -664,43 +633,6 @@ impl CosmosSdkChain {
}
}

/// Query the chain syncing status via a gRPC query.
///
/// Returns an error if the node is still syncing and has not caught up,
/// ie. if `sync_info.syncing` is `true`.
fn chain_grpc_status(&self) -> Result<GetSyncingResponse, Error> {
crate::time!(
"chain_grpc_status",
{
"src_chain": self.config().id.to_string(),
}
);
crate::telemetry!(query, self.id(), "grpc_status");

let grpc_addr = self.grpc_addr.clone();
let grpc_addr_string = grpc_addr.to_string();

let mut client = self
.block_on(ServiceClient::connect(grpc_addr.clone()))
.map_err(Error::grpc_transport)?;

let request = tonic::Request::new(GetSyncingRequest {});

let sync_info = self
.block_on(client.get_syncing(request))
.map_err(|e| Error::grpc_status(e, "get_syncing".to_string()))?
.into_inner();

if sync_info.syncing {
Err(Error::chain_not_caught_up(
grpc_addr_string,
self.config().id.clone(),
))
} else {
Ok(sync_info)
}
}

/// Query the chain status of the RPC and gRPC nodes.
///
/// Returns an error if any of the node is still syncing and has not caught up.
Expand All @@ -722,15 +654,6 @@ impl CosmosSdkChain {
));
}

let grpc_status = self.chain_grpc_status()?;

if grpc_status.syncing {
return Err(Error::chain_not_caught_up(
self.config.grpc_addr.to_string(),
self.config().id.clone(),
));
}

Ok(rpc_status)
}

Expand Down Expand Up @@ -1190,7 +1113,11 @@ impl ChainEndpoint for CosmosSdkChain {
}

fn version_specs(&self) -> Result<Specs, Error> {
let version_specs = self.block_on(fetch_version_specs(self.id(), &self.grpc_addr))?;
let version_specs = self.block_on(fetch_version_specs(
self.id(),
&self.rpc_client,
&self.config.rpc_addr,
))?;
Ok(version_specs)
}

Expand Down Expand Up @@ -2857,7 +2784,11 @@ fn do_health_check(chain: &CosmosSdkChain) -> Result<(), Error> {
),
}

let version_specs = chain.block_on(fetch_version_specs(&chain.config.id, &chain.grpc_addr))?;
let version_specs = chain.block_on(fetch_version_specs(
&chain.config.id,
&chain.rpc_client,
&chain.config.rpc_addr,
))?;

if let Err(diagnostic) = compatibility::run_diagnostic(&version_specs) {
return Err(Error::compat_check_failed(
Expand Down
52 changes: 24 additions & 28 deletions crates/relayer/src/chain/cosmos/query.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use http::uri::Uri;
use ibc_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient;
use ibc_proto::cosmos::base::tendermint::v1beta1::GetNodeInfoRequest;
use ibc_proto::cosmos::base::tendermint::v1beta1::GetNodeInfoResponse;
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use ibc_relayer_types::core::ics23_commitment::merkle::{
convert_tm_to_ics_merkle_proof, MerkleProof,
};
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use prost::Message;
use tendermint::block::Height;
use tendermint_rpc::query::Query;
use tendermint_rpc::{Client, HttpClient, Url};

use crate::chain::cosmos::version::Specs;
use crate::chain::requests::QueryHeight;
use crate::chain::requests::{QueryClientEventRequest, QueryPacketEventDataRequest, QueryTxHash};
use crate::error::Error;

Expand Down Expand Up @@ -121,42 +121,38 @@ pub async fn abci_query(
}

/// Queries the chain to obtain the version information.
pub async fn fetch_version_specs(chain_id: &ChainId, grpc_address: &Uri) -> Result<Specs, Error> {
let grpc_addr_string = grpc_address.to_string();

// Construct a gRPC client
let mut client = ServiceClient::connect(grpc_address.clone())
.await
.map_err(|e| {
Error::fetch_version_grpc_transport(
chain_id.clone(),
grpc_addr_string.clone(),
"tendermint::ServiceClient".to_string(),
pub async fn fetch_version_specs(
chain_id: &ChainId,
rpc_client: &HttpClient,
rpc_addr: &Url,
) -> Result<Specs, Error> {
let query_response = abci_query(
rpc_client,
rpc_addr,
"/cosmos.base.tendermint.v1beta1.Service/GetNodeInfo".to_owned(),
"".to_owned(),
QueryHeight::Latest.into(),
false,
)
.await?;
let node_info_response =
GetNodeInfoResponse::decode(query_response.value.as_ref()).map_err(|e| {
Error::protobuf_decode(
"cosmos.base.tendermint.v1beta1.Service/GetNodeInfo".to_owned(),
e,
)
})?;

let request = tonic::Request::new(GetNodeInfoRequest {});

let response = client.get_node_info(request).await.map_err(|e| {
Error::fetch_version_grpc_status(
chain_id.clone(),
grpc_addr_string.clone(),
"tendermint::ServiceClient".to_string(),
e,
)
})?;

let version = response.into_inner().application_version.ok_or_else(|| {
let version = node_info_response.application_version.ok_or_else(|| {
Error::fetch_version_invalid_version_response(
chain_id.clone(),
grpc_addr_string.clone(),
rpc_addr.to_string(),
"tendermint::GetNodeInfoRequest".to_string(),
)
})?;

// Parse the raw version info into a domain-type `version::Specs`
version
.try_into()
.map_err(|e| Error::fetch_version_parsing(chain_id.clone(), grpc_addr_string.clone(), e))
.map_err(|e| Error::fetch_version_parsing(chain_id.clone(), rpc_addr.to_string(), e))
}

0 comments on commit 4b97bfa

Please sign in to comment.