From c599f703016072ebc625a3e0416b6beee1794017 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Tue, 25 Feb 2025 17:25:50 +0000 Subject: [PATCH] fix substreams info endpoint --- chain/ethereum/examples/firehose.rs | 1 + chain/substreams/examples/substreams.rs | 1 + graph/build.rs | 1 + graph/proto/substreams-rpc.proto | 157 ++-- graph/src/firehose/endpoints.rs | 83 +- graph/src/substreams_rpc/sf.firehose.v2.rs | 884 ++++++++++++++++++ .../substreams_rpc/sf.substreams.rpc.v2.rs | 378 +++++++- node/src/chain.rs | 2 + tests/src/fixture/mod.rs | 1 + 9 files changed, 1392 insertions(+), 116 deletions(-) create mode 100644 graph/src/substreams_rpc/sf.firehose.v2.rs diff --git a/chain/ethereum/examples/firehose.rs b/chain/ethereum/examples/firehose.rs index e5f85964fe1..5a70794dfe2 100644 --- a/chain/ethereum/examples/firehose.rs +++ b/chain/ethereum/examples/firehose.rs @@ -38,6 +38,7 @@ async fn main() -> Result<(), Error> { false, SubgraphLimit::Unlimited, metrics, + false, )); loop { diff --git a/chain/substreams/examples/substreams.rs b/chain/substreams/examples/substreams.rs index eac9397b893..a5af2bbe25c 100644 --- a/chain/substreams/examples/substreams.rs +++ b/chain/substreams/examples/substreams.rs @@ -57,6 +57,7 @@ async fn main() -> Result<(), Error> { false, SubgraphLimit::Unlimited, Arc::new(endpoint_metrics), + true, )); let client = Arc::new(ChainClient::new_firehose(FirehoseEndpoints::for_testing( diff --git a/graph/build.rs b/graph/build.rs index 3cc00c0dc07..4ad3c03a459 100644 --- a/graph/build.rs +++ b/graph/build.rs @@ -22,6 +22,7 @@ fn main() { tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".sf.substreams.v1", "crate::substreams") + .extern_path(".sf.firehose.v2", "crate::firehose") .out_dir("src/substreams_rpc") .compile(&["proto/substreams-rpc.proto"], &["proto"]) .expect("Failed to compile Substreams RPC proto(s)"); diff --git a/graph/proto/substreams-rpc.proto b/graph/proto/substreams-rpc.proto index a0ba1b72037..28298458480 100644 --- a/graph/proto/substreams-rpc.proto +++ b/graph/proto/substreams-rpc.proto @@ -4,39 +4,46 @@ package sf.substreams.rpc.v2; import "google/protobuf/any.proto"; import "substreams.proto"; +import "firehose.proto"; -service Stream { - rpc Blocks(Request) returns (stream Response); +service EndpointInfo { + rpc Info(sf.firehose.v2.InfoRequest) returns (sf.firehose.v2.InfoResponse); } +service Stream { rpc Blocks(Request) returns (stream Response); } + message Request { int64 start_block_num = 1; string start_cursor = 2; uint64 stop_block_num = 3; // With final_block_only, you only receive blocks that are irreversible: - // 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent + // 'final_block_height' will be equal to current block and no 'undo_signal' + // will ever be sent bool final_blocks_only = 4; - // Substreams has two mode when executing your module(s) either development mode or production - // mode. Development and production modes impact the execution of Substreams, important aspects - // of execution include: + // Substreams has two mode when executing your module(s) either development + // mode or production mode. Development and production modes impact the + // execution of Substreams, important aspects of execution include: // * The time required to reach the first byte. // * The speed that large ranges get executed. // * The module logs and outputs sent back to the client. // - // By default, the engine runs in developer mode, with richer and deeper output. Differences - // between production and development modes include: - // * Forward parallel execution is enabled in production mode and disabled in development mode - // * The time required to reach the first byte in development mode is faster than in production mode. + // By default, the engine runs in developer mode, with richer and deeper + // output. Differences between production and development modes include: + // * Forward parallel execution is enabled in production mode and disabled in + // development mode + // * The time required to reach the first byte in development mode is faster + // than in production mode. // // Specific attributes of development mode include: // * The client will receive all of the executed module's logs. - // * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`). + // * It's possible to request specific store snapshots in the execution tree + // (via `debug_initial_store_snapshot_for_modules`). // * Multiple module's output is possible. // - // With production mode`, however, you trade off functionality for high speed enabling forward - // parallel execution of module ahead of time. + // With production mode`, however, you trade off functionality for high speed + // enabling forward parallel execution of module ahead of time. bool production_mode = 5; string output_module = 6; @@ -47,23 +54,24 @@ message Request { repeated string debug_initial_store_snapshot_for_modules = 10; } - message Response { oneof message { - SessionInit session = 1; // Always sent first - ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events. + SessionInit session = 1; // Always sent first + ModulesProgress progress = 2; // Progress of data preparation, before + // sending in the stream of `data` events. BlockScopedData block_scoped_data = 3; BlockUndoSignal block_undo_signal = 4; Error fatal_error = 5; - // Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set. + // Available only in developer mode, and only if + // `debug_initial_store_snapshot_for_modules` is set. InitialSnapshotData debug_snapshot_data = 10; - // Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set. + // Available only in developer mode, and only if + // `debug_initial_store_snapshot_for_modules` is set. InitialSnapshotComplete debug_snapshot_complete = 11; } } - // BlockUndoSignal informs you that every bit of data // with a block number above 'last_valid_block' has been reverted // on-chain. Delete that data and restart from 'last_valid_cursor' @@ -84,16 +92,14 @@ message BlockScopedData { repeated StoreModuleOutput debug_store_outputs = 11; } -message SessionInit { +message SessionInit { string trace_id = 1; uint64 resolved_start_block = 2; uint64 linear_handoff_block = 3; uint64 max_parallel_workers = 4; } -message InitialSnapshotComplete { - string cursor = 1; -} +message InitialSnapshotComplete { string cursor = 1; } message InitialSnapshotData { string module_name = 1; @@ -110,9 +116,9 @@ message MapModuleOutput { } // StoreModuleOutput are produced for store modules in development mode. -// It is not possible to retrieve store models in production, with parallelization -// enabled. If you need the deltas directly, write a pass through mapper module -// that will get them down to you. +// It is not possible to retrieve store models in production, with +// parallelization enabled. If you need the deltas directly, write a pass +// through mapper module that will get them down to you. message StoreModuleOutput { string name = 1; repeated StoreDelta debug_store_deltas = 2; @@ -121,8 +127,9 @@ message StoreModuleOutput { message OutputDebugInfo { repeated string logs = 1; - // LogsTruncated is a flag that tells you if you received all the logs or if they - // were truncated because you logged too much (fixed limit currently is set to 128 KiB). + // LogsTruncated is a flag that tells you if you received all the logs or if + // they were truncated because you logged too much (fixed limit currently is + // set to 128 KiB). bool logs_truncated = 2; bool cached = 3; } @@ -130,7 +137,8 @@ message OutputDebugInfo { // ModulesProgress is a message that is sent every 500ms message ModulesProgress { // previously: repeated ModuleProgress modules = 1; - // these previous `modules` messages were sent in bursts and are not sent anymore. + // these previous `modules` messages were sent in bursts and are not sent + // anymore. reserved 1; // List of jobs running on tier2 servers repeated Job running_jobs = 2; @@ -147,73 +155,82 @@ message ProcessedBytes { uint64 total_bytes_written = 2; } - message Error { string module = 1; string reason = 2; repeated string logs = 3; - // FailureLogsTruncated is a flag that tells you if you received all the logs or if they - // were truncated because you logged too much (fixed limit currently is set to 128 KiB). + // FailureLogsTruncated is a flag that tells you if you received all the logs + // or if they were truncated because you logged too much (fixed limit + // currently is set to 128 KiB). bool logs_truncated = 4; } - message Job { - uint32 stage = 1; - uint64 start_block = 2; - uint64 stop_block = 3; - uint64 processed_blocks = 4; - uint64 duration_ms = 5; + uint32 stage = 1; + uint64 start_block = 2; + uint64 stop_block = 3; + uint64 processed_blocks = 4; + uint64 duration_ms = 5; } message Stage { - repeated string modules = 1; - repeated BlockRange completed_ranges = 2; + repeated string modules = 1; + repeated BlockRange completed_ranges = 2; } -// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2 -// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module +// ModuleStats gathers metrics and statistics from each module, running on tier1 +// or tier2 All the 'count' and 'time_ms' values may include duplicate for each +// stage going over that module message ModuleStats { - // name of the module - string name = 1; + // name of the module + string name = 1; - // total_processed_blocks is the sum of blocks sent to that module code - uint64 total_processed_block_count = 2; - // total_processing_time_ms is the sum of all time spent running that module code - uint64 total_processing_time_ms = 3; + // total_processed_blocks is the sum of blocks sent to that module code + uint64 total_processed_block_count = 2; + // total_processing_time_ms is the sum of all time spent running that module + // code + uint64 total_processing_time_ms = 3; - //// external_calls are chain-specific intrinsics, like "Ethereum RPC calls". - repeated ExternalCallMetric external_call_metrics = 4; + //// external_calls are chain-specific intrinsics, like "Ethereum RPC calls". + repeated ExternalCallMetric external_call_metrics = 4; - // total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...) - uint64 total_store_operation_time_ms = 5; - // total_store_read_count is the sum of all the store Read operations called from that module code - uint64 total_store_read_count = 6; + // total_store_operation_time_ms is the sum of all time spent running that + // module code waiting for a store operation (ex: read, write, delete...) + uint64 total_store_operation_time_ms = 5; + // total_store_read_count is the sum of all the store Read operations called + // from that module code + uint64 total_store_read_count = 6; - // total_store_write_count is the sum of all store Write operations called from that module code (store-only) - uint64 total_store_write_count = 10; + // total_store_write_count is the sum of all store Write operations called + // from that module code (store-only) + uint64 total_store_write_count = 10; - // total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only) - // note that DeletePrefix can be a costly operation on large stores - uint64 total_store_deleteprefix_count = 11; + // total_store_deleteprefix_count is the sum of all store DeletePrefix + // operations called from that module code (store-only) note that DeletePrefix + // can be a costly operation on large stores + uint64 total_store_deleteprefix_count = 11; - // store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only) - uint64 store_size_bytes = 12; + // store_size_bytes is the uncompressed size of the full KV store for that + // module, from the last 'merge' operation (store-only) + uint64 store_size_bytes = 12; - // total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only) - uint64 total_store_merging_time_ms = 13; + // total_store_merging_time_ms is the time spent merging partial stores into a + // full KV store for that module (store-only) + uint64 total_store_merging_time_ms = 13; - // store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way. - bool store_currently_merging = 14; + // store_currently_merging is true if there is a merging operation (partial + // store to full KV store) on the way. + bool store_currently_merging = 14; - // highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only) - uint64 highest_contiguous_block = 15; + // highest_contiguous_block is the highest block in the highest merged full KV + // store of that module (store-only) + uint64 highest_contiguous_block = 15; } message ExternalCallMetric { - string name = 1; - uint64 count = 2; - uint64 time_ms = 3; + string name = 1; + uint64 count = 2; + uint64 time_ms = 3; } message StoreDelta { diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index ebf27faa5a1..00b87ea21a4 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -1,3 +1,4 @@ +use crate::firehose::codec::InfoRequest; use crate::firehose::fetch_client::FetchClient; use crate::firehose::interceptors::AuthInterceptor; use crate::{ @@ -51,6 +52,7 @@ pub struct FirehoseEndpoint { pub filters_enabled: bool, pub compression_enabled: bool, pub subgraph_limit: SubgraphLimit, + is_substreams: bool, endpoint_metrics: Arc, channel: Channel, @@ -79,8 +81,15 @@ impl NetworkDetails for Arc { async fn provides_extended_blocks(&self) -> anyhow::Result { let info = self.clone().info().await?; + let pred = if info.chain_name.contains("arbitrum-one") + || info.chain_name.contains("optimism-mainnet") + { + |x: &String| x.starts_with("extended") || x == "hybrid" + } else { + |x: &String| x == "extended" + }; - Ok(info.block_features.iter().all(|x| x == "extended")) + Ok(info.block_features.iter().any(pred)) } } @@ -175,6 +184,7 @@ impl FirehoseEndpoint { compression_enabled: bool, subgraph_limit: SubgraphLimit, endpoint_metrics: Arc, + is_substreams_endpoint: bool, ) -> Self { let uri = url .as_ref() @@ -238,6 +248,7 @@ impl FirehoseEndpoint { subgraph_limit, endpoint_metrics, info_response: OnceCell::new(), + is_substreams: is_substreams_endpoint, } } @@ -306,7 +317,44 @@ impl FirehoseEndpoint { client } - fn new_substreams_client( + fn new_firehose_info_client(&self) -> crate::firehose::endpoint_info::Client { + let metrics = self.metrics_interceptor(); + let auth = self.auth.clone(); + + let mut client = crate::firehose::endpoint_info::Client::new(metrics, auth); + + if self.compression_enabled { + client = client.with_compression(); + } + + client = client.with_max_message_size(self.max_message_size()); + client + } + + fn new_substreams_info_client( + &self, + ) -> crate::substreams_rpc::endpoint_info_client::EndpointInfoClient< + InterceptedService, impl tonic::service::Interceptor>, + > { + let metrics = self.metrics_interceptor(); + + let mut client = + crate::substreams_rpc::endpoint_info_client::EndpointInfoClient::with_interceptor( + metrics, + self.auth.clone(), + ) + .accept_compressed(CompressionEncoding::Gzip); + + if self.compression_enabled { + client = client.send_compressed(CompressionEncoding::Gzip); + } + + client = client.max_decoding_message_size(self.max_message_size()); + + client + } + + fn new_substreams_streaming_client( &self, ) -> substreams_rpc::stream_client::StreamClient< InterceptedService, impl tonic::service::Interceptor>, @@ -595,7 +643,7 @@ impl FirehoseEndpoint { request: substreams_rpc::Request, headers: &ConnectionHeaders, ) -> Result, anyhow::Error> { - let mut client = self.new_substreams_client(); + let mut client = self.new_substreams_streaming_client(); let request = headers.add_to_request(request); let response_stream = client.blocks(request).await?; let block_stream = response_stream.into_inner(); @@ -610,18 +658,20 @@ impl FirehoseEndpoint { self.info_response .get_or_try_init(move || async move { - let metrics = endpoint.metrics_interceptor(); - let auth = endpoint.auth.clone(); - - let mut client = crate::firehose::endpoint_info::Client::new(metrics, auth); + if endpoint.is_substreams { + let mut client = endpoint.new_substreams_info_client(); + + client + .info(InfoRequest {}) + .await + .map(|r| r.into_inner()) + .map_err(anyhow::Error::from) + .and_then(|e| e.try_into()) + } else { + let mut client = endpoint.new_firehose_info_client(); - if endpoint.compression_enabled { - client = client.with_compression(); + client.info().await } - - client = client.with_max_message_size(endpoint.max_message_size()); - - client.info().await }) .await .map(ToOwned::to_owned) @@ -709,6 +759,7 @@ mod test { false, SubgraphLimit::Unlimited, Arc::new(EndpointMetrics::mock()), + false, ))]; let endpoints = FirehoseEndpoints::for_testing(endpoint); @@ -741,6 +792,7 @@ mod test { false, SubgraphLimit::Limit(2), Arc::new(EndpointMetrics::mock()), + false, ))]; let endpoints = FirehoseEndpoints::for_testing(endpoint); @@ -768,6 +820,7 @@ mod test { false, SubgraphLimit::Disabled, Arc::new(EndpointMetrics::mock()), + false, ))]; let endpoints = FirehoseEndpoints::for_testing(endpoint); @@ -794,6 +847,7 @@ mod test { false, SubgraphLimit::Unlimited, endpoint_metrics.clone(), + false, )); let high_error_adapter2 = Arc::new(FirehoseEndpoint::new( "high_error".to_string(), @@ -804,6 +858,7 @@ mod test { false, SubgraphLimit::Unlimited, endpoint_metrics.clone(), + false, )); let low_availability = Arc::new(FirehoseEndpoint::new( "low availability".to_string(), @@ -814,6 +869,7 @@ mod test { false, SubgraphLimit::Limit(2), endpoint_metrics.clone(), + false, )); let high_availability = Arc::new(FirehoseEndpoint::new( "high availability".to_string(), @@ -824,6 +880,7 @@ mod test { false, SubgraphLimit::Unlimited, endpoint_metrics.clone(), + false, )); endpoint_metrics.report_for_test(&high_error_adapter1.provider, false); diff --git a/graph/src/substreams_rpc/sf.firehose.v2.rs b/graph/src/substreams_rpc/sf.firehose.v2.rs new file mode 100644 index 00000000000..ac86a47e505 --- /dev/null +++ b/graph/src/substreams_rpc/sf.firehose.v2.rs @@ -0,0 +1,884 @@ +// This file is @generated by prost-build. +/// Generated client implementations. +pub mod stream_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct StreamClient { + inner: tonic::client::Grpc, + } + impl StreamClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl StreamClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> StreamClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + StreamClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn blocks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sf.firehose.v2.Stream/Blocks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sf.firehose.v2.Stream", "Blocks")); + self.inner.server_streaming(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod fetch_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct FetchClient { + inner: tonic::client::Grpc, + } + impl FetchClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl FetchClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> FetchClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + FetchClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn block( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sf.firehose.v2.Fetch/Block", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sf.firehose.v2.Fetch", "Block")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. +pub mod endpoint_info_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct EndpointInfoClient { + inner: tonic::client::Grpc, + } + impl EndpointInfoClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl EndpointInfoClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> EndpointInfoClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + EndpointInfoClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sf.firehose.v2.EndpointInfo/Info", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sf.firehose.v2.EndpointInfo", "Info")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod stream_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with StreamServer. + #[async_trait] + pub trait Stream: Send + Sync + 'static { + /// Server streaming response type for the Blocks method. + type BlocksStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + async fn blocks( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct StreamServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl StreamServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for StreamServer + where + T: Stream, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/sf.firehose.v2.Stream/Blocks" => { + #[allow(non_camel_case_types)] + struct BlocksSvc(pub Arc); + impl< + T: Stream, + > tonic::server::ServerStreamingService + for BlocksSvc { + type Response = crate::firehose::Response; + type ResponseStream = T::BlocksStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::blocks(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = BlocksSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for StreamServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for StreamServer { + const NAME: &'static str = "sf.firehose.v2.Stream"; + } +} +/// Generated server implementations. +pub mod fetch_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with FetchServer. + #[async_trait] + pub trait Fetch: Send + Sync + 'static { + async fn block( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct FetchServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl FetchServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for FetchServer + where + T: Fetch, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/sf.firehose.v2.Fetch/Block" => { + #[allow(non_camel_case_types)] + struct BlockSvc(pub Arc); + impl< + T: Fetch, + > tonic::server::UnaryService + for BlockSvc { + type Response = crate::firehose::SingleBlockResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::block(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = BlockSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for FetchServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for FetchServer { + const NAME: &'static str = "sf.firehose.v2.Fetch"; + } +} +/// Generated server implementations. +pub mod endpoint_info_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with EndpointInfoServer. + #[async_trait] + pub trait EndpointInfo: Send + Sync + 'static { + async fn info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct EndpointInfoServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl EndpointInfoServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for EndpointInfoServer + where + T: EndpointInfo, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/sf.firehose.v2.EndpointInfo/Info" => { + #[allow(non_camel_case_types)] + struct InfoSvc(pub Arc); + impl< + T: EndpointInfo, + > tonic::server::UnaryService + for InfoSvc { + type Response = crate::firehose::InfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::info(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = InfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for EndpointInfoServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for EndpointInfoServer { + const NAME: &'static str = "sf.firehose.v2.EndpointInfo"; + } +} diff --git a/graph/src/substreams_rpc/sf.substreams.rpc.v2.rs b/graph/src/substreams_rpc/sf.substreams.rpc.v2.rs index 19b8d0493f0..38a793e666f 100644 --- a/graph/src/substreams_rpc/sf.substreams.rpc.v2.rs +++ b/graph/src/substreams_rpc/sf.substreams.rpc.v2.rs @@ -9,28 +9,32 @@ pub struct Request { #[prost(uint64, tag = "3")] pub stop_block_num: u64, /// With final_block_only, you only receive blocks that are irreversible: - /// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent + /// 'final_block_height' will be equal to current block and no 'undo_signal' + /// will ever be sent #[prost(bool, tag = "4")] pub final_blocks_only: bool, - /// Substreams has two mode when executing your module(s) either development mode or production - /// mode. Development and production modes impact the execution of Substreams, important aspects - /// of execution include: + /// Substreams has two mode when executing your module(s) either development + /// mode or production mode. Development and production modes impact the + /// execution of Substreams, important aspects of execution include: /// * The time required to reach the first byte. /// * The speed that large ranges get executed. /// * The module logs and outputs sent back to the client. /// - /// By default, the engine runs in developer mode, with richer and deeper output. Differences - /// between production and development modes include: - /// * Forward parallel execution is enabled in production mode and disabled in development mode - /// * The time required to reach the first byte in development mode is faster than in production mode. + /// By default, the engine runs in developer mode, with richer and deeper + /// output. Differences between production and development modes include: + /// * Forward parallel execution is enabled in production mode and disabled in + /// development mode + /// * The time required to reach the first byte in development mode is faster + /// than in production mode. /// /// Specific attributes of development mode include: /// * The client will receive all of the executed module's logs. - /// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`). + /// * It's possible to request specific store snapshots in the execution tree + /// (via `debug_initial_store_snapshot_for_modules`). /// * Multiple module's output is possible. /// - /// With production mode`, however, you trade off functionality for high speed enabling forward - /// parallel execution of module ahead of time. + /// With production mode`, however, you trade off functionality for high speed + /// enabling forward parallel execution of module ahead of time. #[prost(bool, tag = "5")] pub production_mode: bool, #[prost(string, tag = "6")] @@ -57,19 +61,22 @@ pub mod response { /// Always sent first #[prost(message, tag = "1")] Session(super::SessionInit), - /// Progress of data preparation, before sending in the stream of `data` events. + /// Progress of data preparation, before #[prost(message, tag = "2")] Progress(super::ModulesProgress), + /// sending in the stream of `data` events. #[prost(message, tag = "3")] BlockScopedData(super::BlockScopedData), #[prost(message, tag = "4")] BlockUndoSignal(super::BlockUndoSignal), #[prost(message, tag = "5")] FatalError(super::Error), - /// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set. + /// Available only in developer mode, and only if + /// `debug_initial_store_snapshot_for_modules` is set. #[prost(message, tag = "10")] DebugSnapshotData(super::InitialSnapshotData), - /// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set. + /// Available only in developer mode, and only if + /// `debug_initial_store_snapshot_for_modules` is set. #[prost(message, tag = "11")] DebugSnapshotComplete(super::InitialSnapshotComplete), } @@ -144,9 +151,9 @@ pub struct MapModuleOutput { pub debug_info: ::core::option::Option, } /// StoreModuleOutput are produced for store modules in development mode. -/// It is not possible to retrieve store models in production, with parallelization -/// enabled. If you need the deltas directly, write a pass through mapper module -/// that will get them down to you. +/// It is not possible to retrieve store models in production, with +/// parallelization enabled. If you need the deltas directly, write a pass +/// through mapper module that will get them down to you. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct StoreModuleOutput { @@ -162,8 +169,9 @@ pub struct StoreModuleOutput { pub struct OutputDebugInfo { #[prost(string, repeated, tag = "1")] pub logs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - /// LogsTruncated is a flag that tells you if you received all the logs or if they - /// were truncated because you logged too much (fixed limit currently is set to 128 KiB). + /// LogsTruncated is a flag that tells you if you received all the logs or if + /// they were truncated because you logged too much (fixed limit currently is + /// set to 128 KiB). #[prost(bool, tag = "2")] pub logs_truncated: bool, #[prost(bool, tag = "3")] @@ -202,8 +210,9 @@ pub struct Error { pub reason: ::prost::alloc::string::String, #[prost(string, repeated, tag = "3")] pub logs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - /// FailureLogsTruncated is a flag that tells you if you received all the logs or if they - /// were truncated because you logged too much (fixed limit currently is set to 128 KiB). + /// FailureLogsTruncated is a flag that tells you if you received all the logs + /// or if they were truncated because you logged too much (fixed limit + /// currently is set to 128 KiB). #[prost(bool, tag = "4")] pub logs_truncated: bool, } @@ -229,8 +238,9 @@ pub struct Stage { #[prost(message, repeated, tag = "2")] pub completed_ranges: ::prost::alloc::vec::Vec, } -/// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2 -/// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module +/// ModuleStats gathers metrics and statistics from each module, running on tier1 +/// or tier2 All the 'count' and 'time_ms' values may include duplicate for each +/// stage going over that module #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ModuleStats { @@ -240,35 +250,44 @@ pub struct ModuleStats { /// total_processed_blocks is the sum of blocks sent to that module code #[prost(uint64, tag = "2")] pub total_processed_block_count: u64, - /// total_processing_time_ms is the sum of all time spent running that module code + /// total_processing_time_ms is the sum of all time spent running that module + /// code #[prost(uint64, tag = "3")] pub total_processing_time_ms: u64, /// // external_calls are chain-specific intrinsics, like "Ethereum RPC calls". #[prost(message, repeated, tag = "4")] pub external_call_metrics: ::prost::alloc::vec::Vec, - /// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...) + /// total_store_operation_time_ms is the sum of all time spent running that + /// module code waiting for a store operation (ex: read, write, delete...) #[prost(uint64, tag = "5")] pub total_store_operation_time_ms: u64, - /// total_store_read_count is the sum of all the store Read operations called from that module code + /// total_store_read_count is the sum of all the store Read operations called + /// from that module code #[prost(uint64, tag = "6")] pub total_store_read_count: u64, - /// total_store_write_count is the sum of all store Write operations called from that module code (store-only) + /// total_store_write_count is the sum of all store Write operations called + /// from that module code (store-only) #[prost(uint64, tag = "10")] pub total_store_write_count: u64, - /// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only) - /// note that DeletePrefix can be a costly operation on large stores + /// total_store_deleteprefix_count is the sum of all store DeletePrefix + /// operations called from that module code (store-only) note that DeletePrefix + /// can be a costly operation on large stores #[prost(uint64, tag = "11")] pub total_store_deleteprefix_count: u64, - /// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only) + /// store_size_bytes is the uncompressed size of the full KV store for that + /// module, from the last 'merge' operation (store-only) #[prost(uint64, tag = "12")] pub store_size_bytes: u64, - /// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only) + /// total_store_merging_time_ms is the time spent merging partial stores into a + /// full KV store for that module (store-only) #[prost(uint64, tag = "13")] pub total_store_merging_time_ms: u64, - /// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way. + /// store_currently_merging is true if there is a merging operation (partial + /// store to full KV store) on the way. #[prost(bool, tag = "14")] pub store_currently_merging: bool, - /// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only) + /// highest_contiguous_block is the highest block in the highest merged full KV + /// store of that module (store-only) #[prost(uint64, tag = "15")] pub highest_contiguous_block: u64, } @@ -350,6 +369,118 @@ pub struct BlockRange { pub end_block: u64, } /// Generated client implementations. +pub mod endpoint_info_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct EndpointInfoClient { + inner: tonic::client::Grpc, + } + impl EndpointInfoClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl EndpointInfoClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> EndpointInfoClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + EndpointInfoClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sf.substreams.rpc.v2.EndpointInfo/Info", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sf.substreams.rpc.v2.EndpointInfo", "Info")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated client implementations. pub mod stream_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; @@ -462,6 +593,187 @@ pub mod stream_client { } } /// Generated server implementations. +pub mod endpoint_info_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with EndpointInfoServer. + #[async_trait] + pub trait EndpointInfo: Send + Sync + 'static { + async fn info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct EndpointInfoServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl EndpointInfoServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for EndpointInfoServer + where + T: EndpointInfo, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/sf.substreams.rpc.v2.EndpointInfo/Info" => { + #[allow(non_camel_case_types)] + struct InfoSvc(pub Arc); + impl< + T: EndpointInfo, + > tonic::server::UnaryService + for InfoSvc { + type Response = crate::firehose::InfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::info(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = InfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for EndpointInfoServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for EndpointInfoServer { + const NAME: &'static str = "sf.substreams.rpc.v2.EndpointInfo"; + } +} +/// Generated server implementations. pub mod stream_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; diff --git a/node/src/chain.rs b/node/src/chain.rs index 289c0580c2e..beeb366ad61 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -90,6 +90,7 @@ pub fn create_substreams_networks( firehose.compression_enabled(), SubgraphLimit::Unlimited, endpoint_metrics.clone(), + true, ))); } } @@ -157,6 +158,7 @@ pub fn create_firehose_networks( firehose.compression_enabled(), firehose.limit_for(&config.node), endpoint_metrics.cheap_clone(), + false, ))); } } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 4e8127875a0..1fc43e495b8 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -108,6 +108,7 @@ impl CommonChainConfig { false, SubgraphLimit::Unlimited, Arc::new(EndpointMetrics::mock()), + false, ))]); Self {