Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ pub struct EnvVars {
pub firehose_disable_extended_blocks_for_chains: Vec<String>,

pub block_write_capacity: usize,

/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT`.
/// The default value is 10.
pub firehose_block_fetch_retry_limit: usize,
/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`.
/// The default value is 60 seconds.
pub firehose_block_fetch_timeout: u64,
}

impl EnvVars {
Expand Down Expand Up @@ -330,6 +337,8 @@ impl EnvVars {
inner.firehose_disable_extended_blocks_for_chains,
),
block_write_capacity: inner.block_write_capacity.0,
firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit,
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
})
}

Expand Down Expand Up @@ -493,6 +502,10 @@ struct Inner {
firehose_disable_extended_blocks_for_chains: Option<String>,
#[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")]
block_write_capacity: NoUnderscores<usize>,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT", default = "10")]
firehose_block_fetch_retry_limit: usize,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")]
firehose_block_fetch_timeout: u64,
}

#[derive(Clone, Debug)]
Expand Down
33 changes: 26 additions & 7 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use futures03::StreamExt;
use http0::uri::{Scheme, Uri};
use itertools::Itertools;
use slog::{error, info, Logger};
use slog::{error, info, trace, Logger};
use std::{collections::HashMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration};
use tokio::sync::OnceCell;
use tonic::codegen::InterceptedService;
Expand All @@ -33,6 +33,7 @@ use crate::components::network_provider::NetworkDetails;
use crate::components::network_provider::ProviderCheckStrategy;
use crate::components::network_provider::ProviderManager;
use crate::components::network_provider::ProviderName;
use crate::prelude::retry;

/// This is constant because we found this magic number of connections after
/// which the grpc connections start to hang.
Expand Down Expand Up @@ -425,7 +426,7 @@ impl FirehoseEndpoint {
}

pub async fn load_blocks_by_numbers<M>(
&self,
self: Arc<Self>,
numbers: Vec<u64>,
logger: &Logger,
) -> Result<Vec<M>, anyhow::Error>
Expand All @@ -435,21 +436,39 @@ impl FirehoseEndpoint {
let mut blocks = Vec::with_capacity(numbers.len());

for number in numbers {
debug!(
let provider_name = self.provider.as_str();

trace!(
logger,
"Loading block for block number {}", number;
"provider" => self.provider.as_str(),
"provider" => provider_name,
);

match self.get_block_by_number::<M>(number, logger).await {
let retry_log_message = format!("get_block_by_number for block {}", number);
let endpoint_for_retry = self.cheap_clone();

let logger_for_retry = logger.clone();
let logger_for_error = logger.clone();

let block = retry(retry_log_message, &logger_for_retry)
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
.run(move || {
let e = endpoint_for_retry.cheap_clone();
let l = logger_for_retry.clone();
async move { e.get_block_by_number::<M>(number, &l).await }
})
.await;

match block {
Ok(block) => {
blocks.push(block);
}
Err(e) => {
error!(
logger,
logger_for_error,
"Failed to load block number {}: {}", number, e;
"provider" => self.provider.as_str(),
"provider" => provider_name,
);
return Err(anyhow::format_err!(
"failed to load block number {}: {}",
Expand Down
Loading