diff --git a/.gitignore b/.gitignore index 9f31dd40..dab176a6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target -CLAUDE.md \ No newline at end of file +AGENTS.md \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 95343ff7..536e5705 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3869,6 +3869,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "intmax2-cli" version = "0.1.33" @@ -3925,6 +3937,7 @@ dependencies = [ "gloo-timers", "hashbrown 0.15.4", "hex", + "instant", "intmax2-interfaces", "intmax2-zkp", "itertools 0.14.0", diff --git a/client-sdk/Cargo.toml b/client-sdk/Cargo.toml index 9777aade..a58c0bfe 100644 --- a/client-sdk/Cargo.toml +++ b/client-sdk/Cargo.toml @@ -47,3 +47,6 @@ futures = "0.3.31" csv = "1.3.1" gloo-timers = { version = "0.3.0", features = ["futures"] } coins-bip39 = "0.12.0" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +instant = { version = "0.1.13", features = ["wasm-bindgen"] } diff --git a/client-sdk/src/external_api/s3_store_vault.rs b/client-sdk/src/external_api/s3_store_vault.rs index c5f80bca..0df99b2f 100644 --- a/client-sdk/src/external_api/s3_store_vault.rs +++ b/client-sdk/src/external_api/s3_store_vault.rs @@ -1,6 +1,7 @@ use super::utils::query::post_request; use crate::external_api::utils::{ query::{build_client, REQWEST_TIMEOUT}, + rate_limit::{limiter_from_env, RequestRateLimiter}, retry::with_retry, }; use async_trait::async_trait; @@ -26,14 +27,19 @@ use intmax2_interfaces::{ }; use intmax2_zkp::ethereum_types::bytes32::Bytes32; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly +const STORE_VAULT_DEFAULT_RPS: f64 = 1.0; +const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] pub struct S3StoreVaultClient { client: Client, base_url: String, + rate_limiter: Arc, } impl S3StoreVaultClient { @@ -41,8 +47,30 @@ impl S3StoreVaultClient { S3StoreVaultClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "STORE_VAULT_MAX_RPS", + "STORE_VAULT_MAX_BURST", + STORE_VAULT_DEFAULT_RPS, + STORE_VAULT_DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter + .run(1, || async { + post_request(&self.client, &self.base_url, endpoint, body).await + }) + .await + } } #[async_trait(?Send)] @@ -62,13 +90,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3PreSaveSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/pre-save-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: S3PreSaveSnapshotResponse = self + .post_with_limit("/pre-save-snapshot", Some(&request_with_auth)) + .await?; // upload data to s3 upload_s3(&self.client, &response.presigned_url, data).await?; @@ -81,13 +105,8 @@ impl StoreVaultClientInterface for S3StoreVaultClient { digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let () = post_request( - &self.client, - &self.base_url, - "/save-snapshot", - Some(&request_with_auth), - ) - .await?; + self.post_with_limit::<_, ()>("/save-snapshot", Some(&request_with_auth)) + .await?; Ok(()) } @@ -103,13 +122,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3GetSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/get-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: S3GetSnapshotResponse = self + .post_with_limit("/get-snapshot", Some(&request_with_auth)) + .await?; match response.presigned_url { Some(url) => { @@ -139,13 +154,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { let digests = data.iter().map(|entry| entry.digest).collect::>(); let request = S3SaveDataBatchRequest { data }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3SaveDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/save-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: S3SaveDataBatchResponse = self + .post_with_limit("/save-data-batch", Some(&request_with_auth)) + .await?; let data = chunk .iter() @@ -173,13 +184,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3GetDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: S3GetDataBatchResponse = self + .post_with_limit("/get-data-batch", Some(&request_with_auth)) + .await?; let urls = response .presigned_urls_with_meta .iter() @@ -239,13 +246,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { }, auth: auth.clone(), }; - let response: S3GetDataSequenceResponse = post_request( - &self.client, - &self.base_url, - "/get-data-sequence", - Some(&request_with_auth), - ) - .await?; + let response: S3GetDataSequenceResponse = self + .post_with_limit("/get-data-sequence", Some(&request_with_auth)) + .await?; let urls = response .presigned_urls_with_meta diff --git a/client-sdk/src/external_api/store_vault_server.rs b/client-sdk/src/external_api/store_vault_server.rs index 756a8002..42d6d71f 100644 --- a/client-sdk/src/external_api/store_vault_server.rs +++ b/client-sdk/src/external_api/store_vault_server.rs @@ -19,18 +19,26 @@ use intmax2_interfaces::{ }; use intmax2_zkp::ethereum_types::bytes32::Bytes32; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; -use crate::external_api::utils::query::build_client; +use crate::external_api::utils::{ + query::build_client, + rate_limit::{limiter_from_env, RequestRateLimiter}, +}; use super::utils::query::post_request; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly +const STORE_VAULT_DEFAULT_RPS: f64 = 1.0; +const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] pub struct StoreVaultServerClient { client: Client, base_url: String, + rate_limiter: Arc, } impl StoreVaultServerClient { @@ -38,8 +46,30 @@ impl StoreVaultServerClient { StoreVaultServerClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "STORE_VAULT_MAX_RPS", + "STORE_VAULT_MAX_BURST", + STORE_VAULT_DEFAULT_RPS, + STORE_VAULT_DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter + .run(1, || async { + post_request(&self.client, &self.base_url, endpoint, body).await + }) + .await + } } #[async_trait(?Send)] @@ -59,13 +89,8 @@ impl StoreVaultClientInterface for StoreVaultServerClient { prev_digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - post_request::<_, ()>( - &self.client, - &self.base_url, - "/save-snapshot", - Some(&request_with_auth), - ) - .await?; + self.post_with_limit::<_, ()>("/save-snapshot", Some(&request_with_auth)) + .await?; Ok(()) } @@ -80,13 +105,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: GetSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/get-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: GetSnapshotResponse = self + .post_with_limit("/get-snapshot", Some(&request_with_auth)) + .await?; Ok(response.data) } @@ -102,13 +123,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { data: chunk.to_vec(), }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: SaveDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/save-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: SaveDataBatchResponse = self + .post_with_limit("/save-data-batch", Some(&request_with_auth)) + .await?; all_digests.extend(response.digests); } Ok(all_digests) @@ -129,13 +146,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: GetDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: GetDataBatchResponse = self + .post_with_limit("/get-data-batch", Some(&request_with_auth)) + .await?; all_data.extend(response.data); } Ok(all_data) @@ -177,13 +190,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { }, auth: auth.clone(), }; - let response: GetDataSequenceResponse = post_request( - &self.client, - &self.base_url, - "/get-data-sequence", - Some(&request_with_auth), - ) - .await?; + let response: GetDataSequenceResponse = self + .post_with_limit("/get-data-sequence", Some(&request_with_auth)) + .await?; Ok((response.data, response.cursor_response)) } } diff --git a/client-sdk/src/external_api/utils/mod.rs b/client-sdk/src/external_api/utils/mod.rs index 72853cbc..6a391877 100644 --- a/client-sdk/src/external_api/utils/mod.rs +++ b/client-sdk/src/external_api/utils/mod.rs @@ -1,3 +1,4 @@ pub mod query; +pub mod rate_limit; pub mod retry; pub mod time; diff --git a/client-sdk/src/external_api/utils/query.rs b/client-sdk/src/external_api/utils/query.rs index ac9c49cd..e76df5e9 100644 --- a/client-sdk/src/external_api/utils/query.rs +++ b/client-sdk/src/external_api/utils/query.rs @@ -4,7 +4,7 @@ use intmax2_interfaces::api::error::ServerError; use reqwest::{header, Client, Response, Url}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use crate::external_api::utils::retry::with_retry; +use crate::external_api::utils::{retry::with_retry, time::sleep_for}; /// Timeout for reqwest requests. /// Because WASM only accepts timeout in the request builder, not the client builder, @@ -13,6 +13,8 @@ pub const REQWEST_TIMEOUT: Duration = Duration::from_secs(30); /// Maximum response body size for logging (in characters) const MAX_RESPONSE_LOG_SIZE: usize = 500; +const DEFAULT_BAD_GATEWAY_RETRIES: u32 = 1; +const DEFAULT_BAD_GATEWAY_RETRY_DELAY_SECS: u64 = 30; #[derive(Debug, Deserialize)] struct ErrorResponse { @@ -133,9 +135,42 @@ fn serialize_body_for_logging(body: Option<&B>) -> Result Result { - with_retry(|| async { request_builder.try_clone().unwrap().send().await }) - .await - .map_err(|e| ServerError::NetworkError(e.to_string())) + let max_502_retries: u32 = std::env::var("BAD_GATEWAY_RETRY_LIMIT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_BAD_GATEWAY_RETRIES); + + let retry_delay_secs: u64 = std::env::var("BAD_GATEWAY_RETRY_DELAY_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_BAD_GATEWAY_RETRY_DELAY_SECS); + + let mut bad_gateway_retries: u32 = 0; + + loop { + let response = with_retry(|| async { request_builder.try_clone().unwrap().send().await }) + .await + .map_err(|e| ServerError::NetworkError(e.to_string()))?; + + let status = response.status().as_u16(); + + if status != 502 { + return Ok(response); + } + + if bad_gateway_retries >= max_502_retries { + return Ok(response); + } + + log::warn!( + "Received 502 Bad Gateway from upstream. Waiting {delay}s before retry (attempt {}/{})", + bad_gateway_retries + 1, + max_502_retries, + delay = retry_delay_secs + ); + sleep_for(retry_delay_secs).await; + bad_gateway_retries += 1; + } } /// Handles HTTP response, including error cases and deserialization diff --git a/client-sdk/src/external_api/utils/rate_limit.rs b/client-sdk/src/external_api/utils/rate_limit.rs new file mode 100644 index 00000000..7b3ec080 --- /dev/null +++ b/client-sdk/src/external_api/utils/rate_limit.rs @@ -0,0 +1,199 @@ +use std::{f64, future::Future, sync::Arc, time::Duration}; + +use tokio::sync::Mutex; + +#[cfg(target_arch = "wasm32")] +use instant::Instant; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::Instant; + +#[cfg(target_arch = "wasm32")] +async fn sleep(duration: Duration) { + gloo_timers::future::sleep(duration).await; +} + +#[cfg(not(target_arch = "wasm32"))] +async fn sleep(duration: Duration) { + tokio::time::sleep(duration).await; +} + +/// Simple token bucket rate limiter that works on native and WASM targets. +#[derive(Debug)] +pub struct RequestRateLimiter { + state: Mutex, + refill_rate_per_sec: f64, + capacity: f64, +} + +#[derive(Debug)] +struct LimiterState { + tokens: f64, + last_refill: Instant, +} + +impl RequestRateLimiter { + /// Create a rate limiter with a maximum burst (`capacity`) and steady refill rate (`permits_per_second`). + /// + /// # Panics + /// Panics if either parameter is zero. + pub fn new(permits_per_second: f64, capacity: f64) -> Self { + assert!( + permits_per_second > 0.0, + "permits_per_second must be positive" + ); + assert!(capacity > 0.0, "capacity must be positive"); + + RequestRateLimiter { + state: Mutex::new(LimiterState { + tokens: capacity, + last_refill: Instant::now(), + }), + refill_rate_per_sec: permits_per_second, + capacity, + } + } + + /// Acquire a single permit before proceeding. + pub async fn acquire(&self) { + self.acquire_permits(1).await; + } + + /// Acquire `permits` units before proceeding. + pub async fn acquire_permits(&self, permits: u32) { + assert!(permits > 0, "permits must be positive"); + let permits = permits as f64; + + loop { + let mut state = self.state.lock().await; + let now = Instant::now(); + let elapsed = now - state.last_refill; + + if elapsed > Duration::from_secs(0) { + let refill_amount = elapsed.as_secs_f64() * self.refill_rate_per_sec; + state.tokens = (state.tokens + refill_amount).min(self.capacity); + state.last_refill = now; + } + + if state.tokens >= permits { + state.tokens -= permits; + break; + } + + let deficit = permits - state.tokens; + let wait_seconds = deficit / self.refill_rate_per_sec; + let sleep_duration = Duration::from_secs_f64(wait_seconds.clamp(0.0, f64::MAX)); + + // Drop the lock before awaiting. + drop(state); + sleep(sleep_duration).await; + } + } + + /// Acquire permits, then execute the provided future. + pub async fn run(&self, permits: u32, op: F) -> T + where + F: FnOnce() -> Fut, + Fut: Future, + { + self.acquire_permits(permits).await; + op().await + } +} + +fn parse_permits_per_second(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + let lower = trimmed.to_ascii_lowercase(); + + if let Some(stripped) = lower.strip_suffix("ms") { + let millis: f64 = stripped.trim().parse().ok()?; + if millis <= 0.0 { + return None; + } + return Some(1_000.0 / millis); + } + + if let Some(stripped) = lower.strip_suffix("s") { + let seconds: f64 = stripped.trim().parse().ok()?; + if seconds <= 0.0 { + return None; + } + return Some(1.0 / seconds); + } + + trimmed.parse::().ok().filter(|v| *v > 0.0) +} + +fn parse_capacity(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + trimmed.parse::().ok().filter(|value| *value > 0.0) +} + +pub fn limiter_from_env( + permits_var: &str, + burst_var: &str, + default_rps: f64, + default_burst_multiplier: u32, +) -> Arc { + let permits_per_second = std::env::var(permits_var) + .ok() + .and_then(|value| parse_permits_per_second(&value)) + .filter(|v| *v > 0.0) + .unwrap_or(default_rps); + + let default_capacity = + (permits_per_second * default_burst_multiplier as f64).max(permits_per_second); + + let capacity = std::env::var(burst_var) + .ok() + .and_then(|value| parse_capacity(&value)) + .filter(|v| *v > 0.0) + .unwrap_or(default_capacity); + + Arc::new(RequestRateLimiter::new(permits_per_second, capacity)) +} + +#[cfg(test)] +mod tests { + use super::*; + #[tokio::test] + async fn waits_until_tokens_refilled() { + let limiter = RequestRateLimiter::new(5.0, 5.0); + + // Consume entire bucket instantly. + limiter.acquire_permits(5).await; + + let before = Instant::now(); + limiter.acquire().await; + let waited = Instant::now() - before; + + assert!( + waited >= Duration::from_millis(180), + "expected to wait for refill, but only waited {waited:?}", + ); + } + + #[test] + fn parses_seconds_interval_into_rps() { + let parsed = parse_permits_per_second("2s").expect("should parse"); + assert!((parsed - 0.5).abs() < f64::EPSILON); + } + + #[test] + fn parses_millis_interval_into_rps() { + let parsed = parse_permits_per_second("500ms").expect("should parse"); + assert!((parsed - 2.0).abs() < f64::EPSILON); + } + + #[test] + fn parses_float_rps() { + let parsed = parse_permits_per_second("0.25").expect("should parse"); + assert!((parsed - 0.25).abs() < f64::EPSILON); + } +} diff --git a/client-sdk/src/external_api/validity_prover.rs b/client-sdk/src/external_api/validity_prover.rs index fbe5260d..88590115 100644 --- a/client-sdk/src/external_api/validity_prover.rs +++ b/client-sdk/src/external_api/validity_prover.rs @@ -29,10 +29,13 @@ use plonky2::{ plonk::{config::PoseidonGoldilocksConfig, proof::ProofWithPublicInputs}, }; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; -use crate::external_api::utils::query::build_client; - -use super::utils::query::{get_request, post_request}; +use crate::external_api::utils::{ + query::{build_client, get_request, post_request}, + rate_limit::{limiter_from_env, RequestRateLimiter}, +}; type F = GoldilocksField; type C = PoseidonGoldilocksConfig; @@ -42,6 +45,7 @@ const D: usize = 2; pub struct ValidityProverClient { client: Client, base_url: String, + rate_limiter: Arc, } impl ValidityProverClient { @@ -49,43 +53,68 @@ impl ValidityProverClient { ValidityProverClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "VALIDITY_PROVER_MAX_RPS", + "VALIDITY_PROVER_MAX_BURST", + DEFAULT_RPS, + DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn get_with_limit( + &self, + endpoint: &str, + query: Option<&Q>, + ) -> Result + where + Q: Serialize, + R: DeserializeOwned, + { + self.rate_limiter.acquire().await; + get_request(&self.client, &self.base_url, endpoint, query).await + } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter.acquire().await; + post_request(&self.client, &self.base_url, endpoint, body).await + } } #[async_trait(?Send)] impl ValidityProverClientInterface for ValidityProverClient { async fn get_block_number(&self) -> Result { let response: GetBlockNumberResponse = - get_request::<(), _>(&self.client, &self.base_url, "/block-number", None).await?; + self.get_with_limit::<(), _>("/block-number", None).await?; Ok(response.block_number) } async fn get_validity_proof_block_number(&self) -> Result { - let response: GetBlockNumberResponse = get_request::<(), _>( - &self.client, - &self.base_url, - "/validity-proof-block-number", - None, - ) - .await?; + let response: GetBlockNumberResponse = self + .get_with_limit::<(), _>("/validity-proof-block-number", None) + .await?; Ok(response.block_number) } async fn get_next_deposit_index(&self) -> Result { - let response: GetNextDepositIndexResponse = - get_request::<(), _>(&self.client, &self.base_url, "/next-deposit-index", None).await?; + let response: GetNextDepositIndexResponse = self + .get_with_limit::<(), _>("/next-deposit-index", None) + .await?; Ok(response.deposit_index) } async fn get_latest_included_deposit_index(&self) -> Result, ServerError> { - let response: GetLatestIncludedDepositIndexResponse = get_request::<(), _>( - &self.client, - &self.base_url, - "/latest-included-deposit-index", - None, - ) - .await?; + let response: GetLatestIncludedDepositIndexResponse = self + .get_with_limit::<(), _>("/latest-included-deposit-index", None) + .await?; Ok(response.deposit_index) } @@ -102,13 +131,9 @@ impl ValidityProverClientInterface for ValidityProverClient { leaf_block_number, is_prev_account_tree, }; - let response: GetUpdateWitnessResponse = get_request( - &self.client, - &self.base_url, - "/get-update-witness", - Some(&query), - ) - .await?; + let response: GetUpdateWitnessResponse = self + .get_with_limit("/get-update-witness", Some(&query)) + .await?; Ok(response.update_witness) } @@ -117,13 +142,9 @@ impl ValidityProverClientInterface for ValidityProverClient { pubkey_salt_hash: Bytes32, ) -> Result, ServerError> { let query = GetDepositInfoQuery { pubkey_salt_hash }; - let response: GetDepositInfoResponse = get_request( - &self.client, - &self.base_url, - "/get-deposit-info", - Some(&query), - ) - .await?; + let response: GetDepositInfoResponse = self + .get_with_limit("/get-deposit-info", Some(&query)) + .await?; Ok(response.deposit_info) } @@ -138,13 +159,9 @@ impl ValidityProverClientInterface for ValidityProverClient { pubkey_salt_hashes: chunk.to_vec(), }; - let response: GetDepositInfoBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-deposit-info-batch", - Some(&request), - ) - .await?; + let response: GetDepositInfoBatchResponse = self + .post_with_limit("/get-deposit-info-batch", Some(&request)) + .await?; all_deposit_info.extend(response.deposit_info); } @@ -157,13 +174,9 @@ impl ValidityProverClientInterface for ValidityProverClient { tx_tree_root: Bytes32, ) -> Result, ServerError> { let query = GetBlockNumberByTxTreeRootQuery { tx_tree_root }; - let response: GetBlockNumberByTxTreeRootResponse = get_request( - &self.client, - &self.base_url, - "/get-block-number-by-tx-tree-root", - Some(&query), - ) - .await?; + let response: GetBlockNumberByTxTreeRootResponse = self + .get_with_limit("/get-block-number-by-tx-tree-root", Some(&query)) + .await?; Ok(response.block_number) } @@ -177,13 +190,9 @@ impl ValidityProverClientInterface for ValidityProverClient { let request = GetBlockNumberByTxTreeRootBatchRequest { tx_tree_roots: chunk.to_vec(), }; - let response: GetBlockNumberByTxTreeRootBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-block-number-by-tx-tree-root-batch", - Some(&request), - ) - .await?; + let response: GetBlockNumberByTxTreeRootBatchResponse = self + .post_with_limit("/get-block-number-by-tx-tree-root-batch", Some(&request)) + .await?; all_block_numbers.extend(response.block_numbers); } @@ -195,13 +204,9 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number: u32, ) -> Result { let query = GetValidityWitnessQuery { block_number }; - let response: GetValidityWitnessResponse = get_request( - &self.client, - &self.base_url, - "/get-validity-witness", - Some(&query), - ) - .await?; + let response: GetValidityWitnessResponse = self + .get_with_limit("/get-validity-witness", Some(&query)) + .await?; Ok(response.validity_witness) } @@ -210,13 +215,9 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number: u32, ) -> Result, ServerError> { let query = GetValidityProofQuery { block_number }; - let response: GetValidityProofResponse = get_request( - &self.client, - &self.base_url, - "/get-validity-proof", - Some(&query), - ) - .await?; + let response: GetValidityProofResponse = self + .get_with_limit("/get-validity-proof", Some(&query)) + .await?; let validity_proof = response.validity_proof.decompress().map_err(|e| { ServerError::ProofDecodeError(format!("Failed to decompress proof: {e}")) })?; @@ -232,13 +233,9 @@ impl ValidityProverClientInterface for ValidityProverClient { root_block_number, leaf_block_number, }; - let response: GetBlockMerkleProofResponse = get_request( - &self.client, - &self.base_url, - "/get-block-merkle-proof", - Some(&query), - ) - .await?; + let response: GetBlockMerkleProofResponse = self + .get_with_limit("/get-block-merkle-proof", Some(&query)) + .await?; Ok(response.block_merkle_proof) } @@ -251,25 +248,17 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number, deposit_index, }; - let response: GetDepositMerkleProofResponse = get_request( - &self.client, - &self.base_url, - "/get-deposit-merkle-proof", - Some(&query), - ) - .await?; + let response: GetDepositMerkleProofResponse = self + .get_with_limit("/get-deposit-merkle-proof", Some(&query)) + .await?; Ok(response.deposit_merkle_proof) } async fn get_account_info(&self, pubkey: U256) -> Result { let query = GetAccountInfoQuery { pubkey }; - let response: GetAccountInfoResponse = get_request( - &self.client, - &self.base_url, - "/get-account-info", - Some(&query), - ) - .await?; + let response: GetAccountInfoResponse = self + .get_with_limit("/get-account-info", Some(&query)) + .await?; Ok(response.account_info) } @@ -283,16 +272,15 @@ impl ValidityProverClientInterface for ValidityProverClient { let request = GetAccountInfoBatchRequest { pubkeys: chunk.to_vec(), }; - let response: GetAccountInfoBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-account-info-batch", - Some(&request), - ) - .await?; + let response: GetAccountInfoBatchResponse = self + .post_with_limit("/get-account-info-batch", Some(&request)) + .await?; all_account_info.extend(response.account_info); } Ok(all_account_info) } } + +const DEFAULT_RPS: f64 = 1.0; +const DEFAULT_BURST_MULTIPLIER: u32 = 2; diff --git a/interfaces/src/api/store_vault_server/interface.rs b/interfaces/src/api/store_vault_server/interface.rs index 4b086612..7c0c828a 100644 --- a/interfaces/src/api/store_vault_server/interface.rs +++ b/interfaces/src/api/store_vault_server/interface.rs @@ -10,7 +10,7 @@ use crate::{ use super::types::{DataWithMetaData, MetaDataCursor, MetaDataCursorResponse}; -pub const MAX_BATCH_SIZE: usize = 256; +pub const MAX_BATCH_SIZE: usize = 64; #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/interfaces/src/api/validity_prover/interface.rs b/interfaces/src/api/validity_prover/interface.rs index dadc4906..689ac402 100644 --- a/interfaces/src/api/validity_prover/interface.rs +++ b/interfaces/src/api/validity_prover/interface.rs @@ -19,7 +19,7 @@ type F = GoldilocksField; type C = PoseidonGoldilocksConfig; const D: usize = 2; -pub const MAX_BATCH_SIZE: usize = 128; +pub const MAX_BATCH_SIZE: usize = 32; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")]