From ad3ce7458ce9dfc2772a9813fbafed3daebf52bb Mon Sep 17 00:00:00 2001 From: kbizikav <132550763+kbizikav@users.noreply.github.com> Date: Mon, 20 Oct 2025 22:47:27 +0700 Subject: [PATCH 1/6] feat: add rate limit & reduce max batch size --- .gitignore | 2 +- AGENTS.md | 21 ++ client-sdk/src/external_api/s3_store_vault.rs | 87 +++++---- .../src/external_api/store_vault_server.rs | 81 ++++---- client-sdk/src/external_api/utils/mod.rs | 1 + .../src/external_api/utils/rate_limit.rs | 136 +++++++++++++ .../src/external_api/validity_prover.rs | 182 ++++++++---------- .../src/api/store_vault_server/interface.rs | 2 +- .../src/api/validity_prover/interface.rs | 2 +- 9 files changed, 336 insertions(+), 178 deletions(-) create mode 100644 AGENTS.md create mode 100644 client-sdk/src/external_api/utils/rate_limit.rs diff --git a/.gitignore b/.gitignore index 9f31dd40..d38d35eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target -CLAUDE.md \ No newline at end of file +CLAUDE.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..17007cc4 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,21 @@ +# Repository Guidelines + +## Project Structure & Module Organization +INTMAX2 is a Rust workspace (`Cargo.toml`) composed of service crates such as `block-builder`, `balance-prover`, `validity-prover`, and `withdrawal-server`, each with its own `src/` for application code. Shared logic lives in `common`, `server-common`, and `interfaces`, while developer tooling sits in `cli`, `client-sdk`, and `scripts/`. Integration and end-to-end tests reside under `tests/`, with contract deployment flows in `tests/tests` and supporting fixtures in `tests/src`. Docker resources live in `docker/` and `compose.yml`, and SSL fixtures for Redis tests are under `redis-test-certs/`. + +## Build, Test, and Development Commands +- `cargo build --workspace` — compile all crates; add `--release` when benchmarking provers. +- `cargo fmt --all` and `cargo clippy --workspace --all-targets --all-features` — enforce formatting and linting before commits. +- `cargo test --workspace --all-features` — run unit tests plus integration suites such as `deploy_contracts.rs`. +- `lefthook run pre-commit` — execute the same format, lint, and typo checks that CI expects. +- `docker compose up -d` — start PostgreSQL, Redis, and ancillary services required by the vault and prover crates. +- `(cd validity-prover && sqlx database setup)` (repeat for other DB-backed services) — provision local schemas before running servers. + +## Coding Style & Naming Conventions +Rust files follow `rustfmt` defaults with crate-level import grouping (`rustfmt.toml` sets `imports_granularity = "Crate"`). Use four-space indentation, `snake_case` for modules and functions, and `UpperCamelCase` for types and enums. Prefer `?` over `unwrap()` in async services, and surface shared errors via the common crate. Keep configuration files (`.env`) out of version control; copy from `.env.example` when needed. + +## Testing Guidelines +Unit tests live alongside source in each crate; integration scenarios use the `tests/tests` suite. Name new tests after the behavior under scrutiny (e.g. `handles_empty_batch`). When a change touches database or prover flows, add an end-to-end case in `tests/tests/*` and ensure the relevant SQL migrations have an accompanying `sqlx prepare` update. Run `cargo test --workspace --all-features` before submitting, and include logs for any ignored tests you enable. + +## Commit & Pull Request Guidelines +Adopt Conventional Commits as seen in history (`feat:`, `fix:`, `chore:`) and keep summaries under 72 characters. Branch from `dev`, reference issues in the body (`Closes #123`), and include context such as CLI commands or screenshots for UI-facing SDK updates. Before opening a PR, confirm format, lint, and test commands succeed locally and note any follow-up work in the description. Tag reviewers for services you touched (e.g. prover team for `validity-prover` changes) and wait for dual approvals before merging. diff --git a/client-sdk/src/external_api/s3_store_vault.rs b/client-sdk/src/external_api/s3_store_vault.rs index c5f80bca..7be0211f 100644 --- a/client-sdk/src/external_api/s3_store_vault.rs +++ b/client-sdk/src/external_api/s3_store_vault.rs @@ -1,6 +1,7 @@ use super::utils::query::post_request; use crate::external_api::utils::{ query::{build_client, REQWEST_TIMEOUT}, + rate_limit::{limiter_from_env, RequestRateLimiter}, retry::with_retry, }; use async_trait::async_trait; @@ -26,14 +27,19 @@ use intmax2_interfaces::{ }; use intmax2_zkp::ethereum_types::bytes32::Bytes32; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly +const STORE_VAULT_DEFAULT_RPS: u32 = 10; +const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] pub struct S3StoreVaultClient { client: Client, base_url: String, + rate_limiter: Arc, } impl S3StoreVaultClient { @@ -41,8 +47,30 @@ impl S3StoreVaultClient { S3StoreVaultClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "STORE_VAULT_MAX_RPS", + "STORE_VAULT_MAX_BURST", + STORE_VAULT_DEFAULT_RPS, + STORE_VAULT_DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter + .run(1, || async { + post_request(&self.client, &self.base_url, endpoint, body).await + }) + .await + } } #[async_trait(?Send)] @@ -62,13 +90,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3PreSaveSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/pre-save-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: S3PreSaveSnapshotResponse = self + .post_with_limit("/pre-save-snapshot", Some(&request_with_auth)) + .await?; // upload data to s3 upload_s3(&self.client, &response.presigned_url, data).await?; @@ -81,13 +105,8 @@ impl StoreVaultClientInterface for S3StoreVaultClient { digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let () = post_request( - &self.client, - &self.base_url, - "/save-snapshot", - Some(&request_with_auth), - ) - .await?; + self.post_with_limit::<_, ()>("/save-snapshot", Some(&request_with_auth)) + .await?; Ok(()) } @@ -103,13 +122,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3GetSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/get-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: S3GetSnapshotResponse = self + .post_with_limit("/get-snapshot", Some(&request_with_auth)) + .await?; match response.presigned_url { Some(url) => { @@ -139,13 +154,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { let digests = data.iter().map(|entry| entry.digest).collect::>(); let request = S3SaveDataBatchRequest { data }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3SaveDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/save-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: S3SaveDataBatchResponse = self + .post_with_limit("/save-data-batch", Some(&request_with_auth)) + .await?; let data = chunk .iter() @@ -173,13 +184,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: S3GetDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: S3GetDataBatchResponse = self + .post_with_limit("/get-data-batch", Some(&request_with_auth)) + .await?; let urls = response .presigned_urls_with_meta .iter() @@ -239,13 +246,9 @@ impl StoreVaultClientInterface for S3StoreVaultClient { }, auth: auth.clone(), }; - let response: S3GetDataSequenceResponse = post_request( - &self.client, - &self.base_url, - "/get-data-sequence", - Some(&request_with_auth), - ) - .await?; + let response: S3GetDataSequenceResponse = self + .post_with_limit("/get-data-sequence", Some(&request_with_auth)) + .await?; let urls = response .presigned_urls_with_meta diff --git a/client-sdk/src/external_api/store_vault_server.rs b/client-sdk/src/external_api/store_vault_server.rs index 756a8002..f1fa9bcc 100644 --- a/client-sdk/src/external_api/store_vault_server.rs +++ b/client-sdk/src/external_api/store_vault_server.rs @@ -19,18 +19,26 @@ use intmax2_interfaces::{ }; use intmax2_zkp::ethereum_types::bytes32::Bytes32; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; -use crate::external_api::utils::query::build_client; +use crate::external_api::utils::{ + query::build_client, + rate_limit::{limiter_from_env, RequestRateLimiter}, +}; use super::utils::query::post_request; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly +const STORE_VAULT_DEFAULT_RPS: u32 = 10; +const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] pub struct StoreVaultServerClient { client: Client, base_url: String, + rate_limiter: Arc, } impl StoreVaultServerClient { @@ -38,8 +46,30 @@ impl StoreVaultServerClient { StoreVaultServerClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "STORE_VAULT_MAX_RPS", + "STORE_VAULT_MAX_BURST", + STORE_VAULT_DEFAULT_RPS, + STORE_VAULT_DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter + .run(1, || async { + post_request(&self.client, &self.base_url, endpoint, body).await + }) + .await + } } #[async_trait(?Send)] @@ -59,13 +89,8 @@ impl StoreVaultClientInterface for StoreVaultServerClient { prev_digest, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - post_request::<_, ()>( - &self.client, - &self.base_url, - "/save-snapshot", - Some(&request_with_auth), - ) - .await?; + self.post_with_limit::<_, ()>("/save-snapshot", Some(&request_with_auth)) + .await?; Ok(()) } @@ -80,13 +105,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: GetSnapshotResponse = post_request( - &self.client, - &self.base_url, - "/get-snapshot", - Some(&request_with_auth), - ) - .await?; + let response: GetSnapshotResponse = self + .post_with_limit("/get-snapshot", Some(&request_with_auth)) + .await?; Ok(response.data) } @@ -102,13 +123,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { data: chunk.to_vec(), }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: SaveDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/save-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: SaveDataBatchResponse = self + .post_with_limit("/save-data-batch", Some(&request_with_auth)) + .await?; all_digests.extend(response.digests); } Ok(all_digests) @@ -129,13 +146,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { pubkey: key.pubkey, }; let request_with_auth = request.sign(view_key, TIME_TO_EXPIRY); - let response: GetDataBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-data-batch", - Some(&request_with_auth), - ) - .await?; + let response: GetDataBatchResponse = self + .post_with_limit("/get-data-batch", Some(&request_with_auth)) + .await?; all_data.extend(response.data); } Ok(all_data) @@ -177,13 +190,9 @@ impl StoreVaultClientInterface for StoreVaultServerClient { }, auth: auth.clone(), }; - let response: GetDataSequenceResponse = post_request( - &self.client, - &self.base_url, - "/get-data-sequence", - Some(&request_with_auth), - ) - .await?; + let response: GetDataSequenceResponse = self + .post_with_limit("/get-data-sequence", Some(&request_with_auth)) + .await?; Ok((response.data, response.cursor_response)) } } diff --git a/client-sdk/src/external_api/utils/mod.rs b/client-sdk/src/external_api/utils/mod.rs index 72853cbc..6a391877 100644 --- a/client-sdk/src/external_api/utils/mod.rs +++ b/client-sdk/src/external_api/utils/mod.rs @@ -1,3 +1,4 @@ pub mod query; +pub mod rate_limit; pub mod retry; pub mod time; diff --git a/client-sdk/src/external_api/utils/rate_limit.rs b/client-sdk/src/external_api/utils/rate_limit.rs new file mode 100644 index 00000000..d2fc4222 --- /dev/null +++ b/client-sdk/src/external_api/utils/rate_limit.rs @@ -0,0 +1,136 @@ +use std::{f64, future::Future, sync::Arc}; + +use tokio::{ + sync::Mutex, + time::{Duration, Instant}, +}; + +/// Simple token bucket rate limiter that works on native and WASM targets. +#[derive(Debug)] +pub struct RequestRateLimiter { + state: Mutex, + refill_rate_per_sec: f64, + capacity: f64, +} + +#[derive(Debug)] +struct LimiterState { + tokens: f64, + last_refill: Instant, +} + +impl RequestRateLimiter { + /// Create a rate limiter with a maximum burst (`capacity`) and steady refill rate (`permits_per_second`). + /// + /// # Panics + /// Panics if either parameter is zero. + pub fn new(permits_per_second: u32, capacity: u32) -> Self { + assert!( + permits_per_second > 0, + "permits_per_second must be positive" + ); + assert!(capacity > 0, "capacity must be positive"); + + let refill_rate_per_sec = permits_per_second as f64; + let capacity = capacity as f64; + + RequestRateLimiter { + state: Mutex::new(LimiterState { + tokens: capacity, + last_refill: Instant::now(), + }), + refill_rate_per_sec, + capacity, + } + } + + /// Acquire a single permit before proceeding. + pub async fn acquire(&self) { + self.acquire_permits(1).await; + } + + /// Acquire `permits` units before proceeding. + pub async fn acquire_permits(&self, permits: u32) { + assert!(permits > 0, "permits must be positive"); + let permits = permits as f64; + + loop { + let mut state = self.state.lock().await; + let now = Instant::now(); + let elapsed = now - state.last_refill; + + if elapsed > Duration::from_secs(0) { + let refill_amount = elapsed.as_secs_f64() * self.refill_rate_per_sec; + state.tokens = (state.tokens + refill_amount).min(self.capacity); + state.last_refill = now; + } + + if state.tokens >= permits { + state.tokens -= permits; + break; + } + + let deficit = permits - state.tokens; + let wait_seconds = deficit / self.refill_rate_per_sec; + let sleep_duration = Duration::from_secs_f64(wait_seconds.clamp(0.0, f64::MAX)); + + // Drop the lock before awaiting. + drop(state); + tokio::time::sleep(sleep_duration).await; + } + } + + /// Acquire permits, then execute the provided future. + pub async fn run(&self, permits: u32, op: F) -> T + where + F: FnOnce() -> Fut, + Fut: Future, + { + self.acquire_permits(permits).await; + op().await + } +} + +pub fn limiter_from_env( + permits_var: &str, + burst_var: &str, + default_rps: u32, + default_burst_multiplier: u32, +) -> Arc { + let permits_per_second = std::env::var(permits_var) + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|v| *v > 0) + .unwrap_or(default_rps); + + let default_capacity = permits_per_second.saturating_mul(default_burst_multiplier); + + let capacity = std::env::var(burst_var) + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|v| *v > 0) + .unwrap_or(default_capacity.max(permits_per_second)); + + Arc::new(RequestRateLimiter::new(permits_per_second, capacity)) +} + +#[cfg(test)] +mod tests { + use super::*; + #[tokio::test] + async fn waits_until_tokens_refilled() { + let limiter = RequestRateLimiter::new(5, 5); + + // Consume entire bucket instantly. + limiter.acquire_permits(5).await; + + let before = Instant::now(); + limiter.acquire().await; + let waited = Instant::now() - before; + + assert!( + waited >= Duration::from_millis(180), + "expected to wait for refill, but only waited {waited:?}", + ); + } +} diff --git a/client-sdk/src/external_api/validity_prover.rs b/client-sdk/src/external_api/validity_prover.rs index fbe5260d..f804b77c 100644 --- a/client-sdk/src/external_api/validity_prover.rs +++ b/client-sdk/src/external_api/validity_prover.rs @@ -29,10 +29,13 @@ use plonky2::{ plonk::{config::PoseidonGoldilocksConfig, proof::ProofWithPublicInputs}, }; use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::Arc; -use crate::external_api::utils::query::build_client; - -use super::utils::query::{get_request, post_request}; +use crate::external_api::utils::{ + query::{build_client, get_request, post_request}, + rate_limit::{limiter_from_env, RequestRateLimiter}, +}; type F = GoldilocksField; type C = PoseidonGoldilocksConfig; @@ -42,6 +45,7 @@ const D: usize = 2; pub struct ValidityProverClient { client: Client, base_url: String, + rate_limiter: Arc, } impl ValidityProverClient { @@ -49,43 +53,68 @@ impl ValidityProverClient { ValidityProverClient { client: build_client(), base_url: base_url.to_string(), + rate_limiter: limiter_from_env( + "VALIDITY_PROVER_MAX_RPS", + "VALIDITY_PROVER_MAX_BURST", + DEFAULT_RPS, + DEFAULT_BURST_MULTIPLIER, + ), } } + + async fn get_with_limit( + &self, + endpoint: &str, + query: Option<&Q>, + ) -> Result + where + Q: Serialize, + R: DeserializeOwned, + { + self.rate_limiter.acquire().await; + get_request(&self.client, &self.base_url, endpoint, query).await + } + + async fn post_with_limit( + &self, + endpoint: &str, + body: Option<&B>, + ) -> Result + where + B: Serialize, + R: DeserializeOwned, + { + self.rate_limiter.acquire().await; + post_request(&self.client, &self.base_url, endpoint, body).await + } } #[async_trait(?Send)] impl ValidityProverClientInterface for ValidityProverClient { async fn get_block_number(&self) -> Result { let response: GetBlockNumberResponse = - get_request::<(), _>(&self.client, &self.base_url, "/block-number", None).await?; + self.get_with_limit::<(), _>("/block-number", None).await?; Ok(response.block_number) } async fn get_validity_proof_block_number(&self) -> Result { - let response: GetBlockNumberResponse = get_request::<(), _>( - &self.client, - &self.base_url, - "/validity-proof-block-number", - None, - ) - .await?; + let response: GetBlockNumberResponse = self + .get_with_limit::<(), _>("/validity-proof-block-number", None) + .await?; Ok(response.block_number) } async fn get_next_deposit_index(&self) -> Result { - let response: GetNextDepositIndexResponse = - get_request::<(), _>(&self.client, &self.base_url, "/next-deposit-index", None).await?; + let response: GetNextDepositIndexResponse = self + .get_with_limit::<(), _>("/next-deposit-index", None) + .await?; Ok(response.deposit_index) } async fn get_latest_included_deposit_index(&self) -> Result, ServerError> { - let response: GetLatestIncludedDepositIndexResponse = get_request::<(), _>( - &self.client, - &self.base_url, - "/latest-included-deposit-index", - None, - ) - .await?; + let response: GetLatestIncludedDepositIndexResponse = self + .get_with_limit::<(), _>("/latest-included-deposit-index", None) + .await?; Ok(response.deposit_index) } @@ -102,13 +131,9 @@ impl ValidityProverClientInterface for ValidityProverClient { leaf_block_number, is_prev_account_tree, }; - let response: GetUpdateWitnessResponse = get_request( - &self.client, - &self.base_url, - "/get-update-witness", - Some(&query), - ) - .await?; + let response: GetUpdateWitnessResponse = self + .get_with_limit("/get-update-witness", Some(&query)) + .await?; Ok(response.update_witness) } @@ -117,13 +142,9 @@ impl ValidityProverClientInterface for ValidityProverClient { pubkey_salt_hash: Bytes32, ) -> Result, ServerError> { let query = GetDepositInfoQuery { pubkey_salt_hash }; - let response: GetDepositInfoResponse = get_request( - &self.client, - &self.base_url, - "/get-deposit-info", - Some(&query), - ) - .await?; + let response: GetDepositInfoResponse = self + .get_with_limit("/get-deposit-info", Some(&query)) + .await?; Ok(response.deposit_info) } @@ -138,13 +159,9 @@ impl ValidityProverClientInterface for ValidityProverClient { pubkey_salt_hashes: chunk.to_vec(), }; - let response: GetDepositInfoBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-deposit-info-batch", - Some(&request), - ) - .await?; + let response: GetDepositInfoBatchResponse = self + .post_with_limit("/get-deposit-info-batch", Some(&request)) + .await?; all_deposit_info.extend(response.deposit_info); } @@ -157,13 +174,9 @@ impl ValidityProverClientInterface for ValidityProverClient { tx_tree_root: Bytes32, ) -> Result, ServerError> { let query = GetBlockNumberByTxTreeRootQuery { tx_tree_root }; - let response: GetBlockNumberByTxTreeRootResponse = get_request( - &self.client, - &self.base_url, - "/get-block-number-by-tx-tree-root", - Some(&query), - ) - .await?; + let response: GetBlockNumberByTxTreeRootResponse = self + .get_with_limit("/get-block-number-by-tx-tree-root", Some(&query)) + .await?; Ok(response.block_number) } @@ -177,13 +190,9 @@ impl ValidityProverClientInterface for ValidityProverClient { let request = GetBlockNumberByTxTreeRootBatchRequest { tx_tree_roots: chunk.to_vec(), }; - let response: GetBlockNumberByTxTreeRootBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-block-number-by-tx-tree-root-batch", - Some(&request), - ) - .await?; + let response: GetBlockNumberByTxTreeRootBatchResponse = self + .post_with_limit("/get-block-number-by-tx-tree-root-batch", Some(&request)) + .await?; all_block_numbers.extend(response.block_numbers); } @@ -195,13 +204,9 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number: u32, ) -> Result { let query = GetValidityWitnessQuery { block_number }; - let response: GetValidityWitnessResponse = get_request( - &self.client, - &self.base_url, - "/get-validity-witness", - Some(&query), - ) - .await?; + let response: GetValidityWitnessResponse = self + .get_with_limit("/get-validity-witness", Some(&query)) + .await?; Ok(response.validity_witness) } @@ -210,13 +215,9 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number: u32, ) -> Result, ServerError> { let query = GetValidityProofQuery { block_number }; - let response: GetValidityProofResponse = get_request( - &self.client, - &self.base_url, - "/get-validity-proof", - Some(&query), - ) - .await?; + let response: GetValidityProofResponse = self + .get_with_limit("/get-validity-proof", Some(&query)) + .await?; let validity_proof = response.validity_proof.decompress().map_err(|e| { ServerError::ProofDecodeError(format!("Failed to decompress proof: {e}")) })?; @@ -232,13 +233,9 @@ impl ValidityProverClientInterface for ValidityProverClient { root_block_number, leaf_block_number, }; - let response: GetBlockMerkleProofResponse = get_request( - &self.client, - &self.base_url, - "/get-block-merkle-proof", - Some(&query), - ) - .await?; + let response: GetBlockMerkleProofResponse = self + .get_with_limit("/get-block-merkle-proof", Some(&query)) + .await?; Ok(response.block_merkle_proof) } @@ -251,25 +248,17 @@ impl ValidityProverClientInterface for ValidityProverClient { block_number, deposit_index, }; - let response: GetDepositMerkleProofResponse = get_request( - &self.client, - &self.base_url, - "/get-deposit-merkle-proof", - Some(&query), - ) - .await?; + let response: GetDepositMerkleProofResponse = self + .get_with_limit("/get-deposit-merkle-proof", Some(&query)) + .await?; Ok(response.deposit_merkle_proof) } async fn get_account_info(&self, pubkey: U256) -> Result { let query = GetAccountInfoQuery { pubkey }; - let response: GetAccountInfoResponse = get_request( - &self.client, - &self.base_url, - "/get-account-info", - Some(&query), - ) - .await?; + let response: GetAccountInfoResponse = self + .get_with_limit("/get-account-info", Some(&query)) + .await?; Ok(response.account_info) } @@ -283,16 +272,15 @@ impl ValidityProverClientInterface for ValidityProverClient { let request = GetAccountInfoBatchRequest { pubkeys: chunk.to_vec(), }; - let response: GetAccountInfoBatchResponse = post_request( - &self.client, - &self.base_url, - "/get-account-info-batch", - Some(&request), - ) - .await?; + let response: GetAccountInfoBatchResponse = self + .post_with_limit("/get-account-info-batch", Some(&request)) + .await?; all_account_info.extend(response.account_info); } Ok(all_account_info) } } + +const DEFAULT_RPS: u32 = 15; +const DEFAULT_BURST_MULTIPLIER: u32 = 2; diff --git a/interfaces/src/api/store_vault_server/interface.rs b/interfaces/src/api/store_vault_server/interface.rs index 4b086612..7c0c828a 100644 --- a/interfaces/src/api/store_vault_server/interface.rs +++ b/interfaces/src/api/store_vault_server/interface.rs @@ -10,7 +10,7 @@ use crate::{ use super::types::{DataWithMetaData, MetaDataCursor, MetaDataCursorResponse}; -pub const MAX_BATCH_SIZE: usize = 256; +pub const MAX_BATCH_SIZE: usize = 64; #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/interfaces/src/api/validity_prover/interface.rs b/interfaces/src/api/validity_prover/interface.rs index dadc4906..689ac402 100644 --- a/interfaces/src/api/validity_prover/interface.rs +++ b/interfaces/src/api/validity_prover/interface.rs @@ -19,7 +19,7 @@ type F = GoldilocksField; type C = PoseidonGoldilocksConfig; const D: usize = 2; -pub const MAX_BATCH_SIZE: usize = 128; +pub const MAX_BATCH_SIZE: usize = 32; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] From 0cf111bd958ede735b46e91ccae0ab781d97ae58 Mon Sep 17 00:00:00 2001 From: kbizikav <132550763+kbizikav@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:06:16 +0700 Subject: [PATCH 2/6] feat: use wasm time --- Cargo.lock | 13 ++++++++++ client-sdk/Cargo.toml | 3 +++ client-sdk/src/external_api/s3_store_vault.rs | 2 +- .../src/external_api/store_vault_server.rs | 2 +- .../src/external_api/utils/rate_limit.rs | 24 ++++++++++++++----- .../src/external_api/validity_prover.rs | 2 +- 6 files changed, 37 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95343ff7..536e5705 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3869,6 +3869,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "intmax2-cli" version = "0.1.33" @@ -3925,6 +3937,7 @@ dependencies = [ "gloo-timers", "hashbrown 0.15.4", "hex", + "instant", "intmax2-interfaces", "intmax2-zkp", "itertools 0.14.0", diff --git a/client-sdk/Cargo.toml b/client-sdk/Cargo.toml index 9777aade..a58c0bfe 100644 --- a/client-sdk/Cargo.toml +++ b/client-sdk/Cargo.toml @@ -47,3 +47,6 @@ futures = "0.3.31" csv = "1.3.1" gloo-timers = { version = "0.3.0", features = ["futures"] } coins-bip39 = "0.12.0" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +instant = { version = "0.1.13", features = ["wasm-bindgen"] } diff --git a/client-sdk/src/external_api/s3_store_vault.rs b/client-sdk/src/external_api/s3_store_vault.rs index 7be0211f..98bb005f 100644 --- a/client-sdk/src/external_api/s3_store_vault.rs +++ b/client-sdk/src/external_api/s3_store_vault.rs @@ -32,7 +32,7 @@ use std::sync::Arc; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly -const STORE_VAULT_DEFAULT_RPS: u32 = 10; +const STORE_VAULT_DEFAULT_RPS: u32 = 1; const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] diff --git a/client-sdk/src/external_api/store_vault_server.rs b/client-sdk/src/external_api/store_vault_server.rs index f1fa9bcc..1bc3b87f 100644 --- a/client-sdk/src/external_api/store_vault_server.rs +++ b/client-sdk/src/external_api/store_vault_server.rs @@ -31,7 +31,7 @@ use super::utils::query::post_request; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly -const STORE_VAULT_DEFAULT_RPS: u32 = 10; +const STORE_VAULT_DEFAULT_RPS: u32 = 1; const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] diff --git a/client-sdk/src/external_api/utils/rate_limit.rs b/client-sdk/src/external_api/utils/rate_limit.rs index d2fc4222..eba2ced1 100644 --- a/client-sdk/src/external_api/utils/rate_limit.rs +++ b/client-sdk/src/external_api/utils/rate_limit.rs @@ -1,9 +1,21 @@ -use std::{f64, future::Future, sync::Arc}; +use std::{f64, future::Future, sync::Arc, time::Duration}; -use tokio::{ - sync::Mutex, - time::{Duration, Instant}, -}; +use tokio::sync::Mutex; + +#[cfg(target_arch = "wasm32")] +use instant::Instant; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::Instant; + +#[cfg(target_arch = "wasm32")] +async fn sleep(duration: Duration) { + gloo_timers::future::sleep(duration).await; +} + +#[cfg(not(target_arch = "wasm32"))] +async fn sleep(duration: Duration) { + tokio::time::sleep(duration).await; +} /// Simple token bucket rate limiter that works on native and WASM targets. #[derive(Debug)] @@ -76,7 +88,7 @@ impl RequestRateLimiter { // Drop the lock before awaiting. drop(state); - tokio::time::sleep(sleep_duration).await; + sleep(sleep_duration).await; } } diff --git a/client-sdk/src/external_api/validity_prover.rs b/client-sdk/src/external_api/validity_prover.rs index f804b77c..09df4e46 100644 --- a/client-sdk/src/external_api/validity_prover.rs +++ b/client-sdk/src/external_api/validity_prover.rs @@ -282,5 +282,5 @@ impl ValidityProverClientInterface for ValidityProverClient { } } -const DEFAULT_RPS: u32 = 15; +const DEFAULT_RPS: u32 = 1; const DEFAULT_BURST_MULTIPLIER: u32 = 2; From 1696b1cf3a0e70b07e415b126e49163ec7d5a682 Mon Sep 17 00:00:00 2001 From: kbizikav <132550763+kbizikav@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:15:12 +0700 Subject: [PATCH 3/6] feat: longer rate limit --- client-sdk/src/external_api/s3_store_vault.rs | 2 +- .../src/external_api/store_vault_server.rs | 2 +- .../src/external_api/utils/rate_limit.rs | 81 +++++++++++++++---- .../src/external_api/validity_prover.rs | 2 +- 4 files changed, 69 insertions(+), 18 deletions(-) diff --git a/client-sdk/src/external_api/s3_store_vault.rs b/client-sdk/src/external_api/s3_store_vault.rs index 98bb005f..0df99b2f 100644 --- a/client-sdk/src/external_api/s3_store_vault.rs +++ b/client-sdk/src/external_api/s3_store_vault.rs @@ -32,7 +32,7 @@ use std::sync::Arc; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly -const STORE_VAULT_DEFAULT_RPS: u32 = 1; +const STORE_VAULT_DEFAULT_RPS: f64 = 1.0; const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] diff --git a/client-sdk/src/external_api/store_vault_server.rs b/client-sdk/src/external_api/store_vault_server.rs index 1bc3b87f..42d6d71f 100644 --- a/client-sdk/src/external_api/store_vault_server.rs +++ b/client-sdk/src/external_api/store_vault_server.rs @@ -31,7 +31,7 @@ use super::utils::query::post_request; const TIME_TO_EXPIRY: u64 = 60; // 1 minute for normal requests const TIME_TO_EXPIRY_READONLY: u64 = 60 * 60 * 24; // 24 hours for readonly -const STORE_VAULT_DEFAULT_RPS: u32 = 1; +const STORE_VAULT_DEFAULT_RPS: f64 = 1.0; const STORE_VAULT_DEFAULT_BURST_MULTIPLIER: u32 = 2; #[derive(Debug, Clone)] diff --git a/client-sdk/src/external_api/utils/rate_limit.rs b/client-sdk/src/external_api/utils/rate_limit.rs index eba2ced1..7b3ec080 100644 --- a/client-sdk/src/external_api/utils/rate_limit.rs +++ b/client-sdk/src/external_api/utils/rate_limit.rs @@ -36,22 +36,19 @@ impl RequestRateLimiter { /// /// # Panics /// Panics if either parameter is zero. - pub fn new(permits_per_second: u32, capacity: u32) -> Self { + pub fn new(permits_per_second: f64, capacity: f64) -> Self { assert!( - permits_per_second > 0, + permits_per_second > 0.0, "permits_per_second must be positive" ); - assert!(capacity > 0, "capacity must be positive"); - - let refill_rate_per_sec = permits_per_second as f64; - let capacity = capacity as f64; + assert!(capacity > 0.0, "capacity must be positive"); RequestRateLimiter { state: Mutex::new(LimiterState { tokens: capacity, last_refill: Instant::now(), }), - refill_rate_per_sec, + refill_rate_per_sec: permits_per_second, capacity, } } @@ -103,25 +100,61 @@ impl RequestRateLimiter { } } +fn parse_permits_per_second(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + let lower = trimmed.to_ascii_lowercase(); + + if let Some(stripped) = lower.strip_suffix("ms") { + let millis: f64 = stripped.trim().parse().ok()?; + if millis <= 0.0 { + return None; + } + return Some(1_000.0 / millis); + } + + if let Some(stripped) = lower.strip_suffix("s") { + let seconds: f64 = stripped.trim().parse().ok()?; + if seconds <= 0.0 { + return None; + } + return Some(1.0 / seconds); + } + + trimmed.parse::().ok().filter(|v| *v > 0.0) +} + +fn parse_capacity(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + trimmed.parse::().ok().filter(|value| *value > 0.0) +} + pub fn limiter_from_env( permits_var: &str, burst_var: &str, - default_rps: u32, + default_rps: f64, default_burst_multiplier: u32, ) -> Arc { let permits_per_second = std::env::var(permits_var) .ok() - .and_then(|value| value.parse::().ok()) - .filter(|v| *v > 0) + .and_then(|value| parse_permits_per_second(&value)) + .filter(|v| *v > 0.0) .unwrap_or(default_rps); - let default_capacity = permits_per_second.saturating_mul(default_burst_multiplier); + let default_capacity = + (permits_per_second * default_burst_multiplier as f64).max(permits_per_second); let capacity = std::env::var(burst_var) .ok() - .and_then(|value| value.parse::().ok()) - .filter(|v| *v > 0) - .unwrap_or(default_capacity.max(permits_per_second)); + .and_then(|value| parse_capacity(&value)) + .filter(|v| *v > 0.0) + .unwrap_or(default_capacity); Arc::new(RequestRateLimiter::new(permits_per_second, capacity)) } @@ -131,7 +164,7 @@ mod tests { use super::*; #[tokio::test] async fn waits_until_tokens_refilled() { - let limiter = RequestRateLimiter::new(5, 5); + let limiter = RequestRateLimiter::new(5.0, 5.0); // Consume entire bucket instantly. limiter.acquire_permits(5).await; @@ -145,4 +178,22 @@ mod tests { "expected to wait for refill, but only waited {waited:?}", ); } + + #[test] + fn parses_seconds_interval_into_rps() { + let parsed = parse_permits_per_second("2s").expect("should parse"); + assert!((parsed - 0.5).abs() < f64::EPSILON); + } + + #[test] + fn parses_millis_interval_into_rps() { + let parsed = parse_permits_per_second("500ms").expect("should parse"); + assert!((parsed - 2.0).abs() < f64::EPSILON); + } + + #[test] + fn parses_float_rps() { + let parsed = parse_permits_per_second("0.25").expect("should parse"); + assert!((parsed - 0.25).abs() < f64::EPSILON); + } } diff --git a/client-sdk/src/external_api/validity_prover.rs b/client-sdk/src/external_api/validity_prover.rs index 09df4e46..88590115 100644 --- a/client-sdk/src/external_api/validity_prover.rs +++ b/client-sdk/src/external_api/validity_prover.rs @@ -282,5 +282,5 @@ impl ValidityProverClientInterface for ValidityProverClient { } } -const DEFAULT_RPS: u32 = 1; +const DEFAULT_RPS: f64 = 1.0; const DEFAULT_BURST_MULTIPLIER: u32 = 2; From 8b99c19d256cf5fdffbcef32b116cc5774ada758 Mon Sep 17 00:00:00 2001 From: kbizikav <132550763+kbizikav@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:24:52 +0700 Subject: [PATCH 4/6] feat: remove AGENTS.md --- .gitignore | 2 +- AGENTS.md | 21 --------------------- 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 AGENTS.md diff --git a/.gitignore b/.gitignore index d38d35eb..dab176a6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target -CLAUDE.md +AGENTS.md \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md deleted file mode 100644 index 17007cc4..00000000 --- a/AGENTS.md +++ /dev/null @@ -1,21 +0,0 @@ -# Repository Guidelines - -## Project Structure & Module Organization -INTMAX2 is a Rust workspace (`Cargo.toml`) composed of service crates such as `block-builder`, `balance-prover`, `validity-prover`, and `withdrawal-server`, each with its own `src/` for application code. Shared logic lives in `common`, `server-common`, and `interfaces`, while developer tooling sits in `cli`, `client-sdk`, and `scripts/`. Integration and end-to-end tests reside under `tests/`, with contract deployment flows in `tests/tests` and supporting fixtures in `tests/src`. Docker resources live in `docker/` and `compose.yml`, and SSL fixtures for Redis tests are under `redis-test-certs/`. - -## Build, Test, and Development Commands -- `cargo build --workspace` — compile all crates; add `--release` when benchmarking provers. -- `cargo fmt --all` and `cargo clippy --workspace --all-targets --all-features` — enforce formatting and linting before commits. -- `cargo test --workspace --all-features` — run unit tests plus integration suites such as `deploy_contracts.rs`. -- `lefthook run pre-commit` — execute the same format, lint, and typo checks that CI expects. -- `docker compose up -d` — start PostgreSQL, Redis, and ancillary services required by the vault and prover crates. -- `(cd validity-prover && sqlx database setup)` (repeat for other DB-backed services) — provision local schemas before running servers. - -## Coding Style & Naming Conventions -Rust files follow `rustfmt` defaults with crate-level import grouping (`rustfmt.toml` sets `imports_granularity = "Crate"`). Use four-space indentation, `snake_case` for modules and functions, and `UpperCamelCase` for types and enums. Prefer `?` over `unwrap()` in async services, and surface shared errors via the common crate. Keep configuration files (`.env`) out of version control; copy from `.env.example` when needed. - -## Testing Guidelines -Unit tests live alongside source in each crate; integration scenarios use the `tests/tests` suite. Name new tests after the behavior under scrutiny (e.g. `handles_empty_batch`). When a change touches database or prover flows, add an end-to-end case in `tests/tests/*` and ensure the relevant SQL migrations have an accompanying `sqlx prepare` update. Run `cargo test --workspace --all-features` before submitting, and include logs for any ignored tests you enable. - -## Commit & Pull Request Guidelines -Adopt Conventional Commits as seen in history (`feat:`, `fix:`, `chore:`) and keep summaries under 72 characters. Branch from `dev`, reference issues in the body (`Closes #123`), and include context such as CLI commands or screenshots for UI-facing SDK updates. Before opening a PR, confirm format, lint, and test commands succeed locally and note any follow-up work in the description. Tag reviewers for services you touched (e.g. prover team for `validity-prover` changes) and wait for dual approvals before merging. From f9fd5ec352364d457a672f2ee4a80b4d0c230281 Mon Sep 17 00:00:00 2001 From: kbizikav <132550763+kbizikav@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:37:52 +0700 Subject: [PATCH 5/6] fix: typo --- validity-prover/src/app/observer_rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validity-prover/src/app/observer_rpc.rs b/validity-prover/src/app/observer_rpc.rs index 289c7e84..af04952c 100644 --- a/validity-prover/src/app/observer_rpc.rs +++ b/validity-prover/src/app/observer_rpc.rs @@ -469,7 +469,7 @@ impl SyncEvent for RPCObserver { } } -// This function is used to generate an error for trigging RPC error for testing purposes. +// This function is used to generate an error for triggering RPC error for testing purposes. pub fn generate_error_for_test() -> Result<(), ObserverError> { let error_timestamps = match std::env::var("ERROR_TIMESTAMPS") { Ok(val) => val, From 1d50c4b609878d06281d1dfeebec7ce6f95d08b9 Mon Sep 17 00:00:00 2001 From: Baby Bear <132609968+smallbabybear@users.noreply.github.com> Date: Thu, 30 Oct 2025 09:39:54 +0700 Subject: [PATCH 6/6] Retry 502 error (#470) --- client-sdk/src/external_api/utils/query.rs | 43 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/client-sdk/src/external_api/utils/query.rs b/client-sdk/src/external_api/utils/query.rs index ac9c49cd..e76df5e9 100644 --- a/client-sdk/src/external_api/utils/query.rs +++ b/client-sdk/src/external_api/utils/query.rs @@ -4,7 +4,7 @@ use intmax2_interfaces::api::error::ServerError; use reqwest::{header, Client, Response, Url}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use crate::external_api::utils::retry::with_retry; +use crate::external_api::utils::{retry::with_retry, time::sleep_for}; /// Timeout for reqwest requests. /// Because WASM only accepts timeout in the request builder, not the client builder, @@ -13,6 +13,8 @@ pub const REQWEST_TIMEOUT: Duration = Duration::from_secs(30); /// Maximum response body size for logging (in characters) const MAX_RESPONSE_LOG_SIZE: usize = 500; +const DEFAULT_BAD_GATEWAY_RETRIES: u32 = 1; +const DEFAULT_BAD_GATEWAY_RETRY_DELAY_SECS: u64 = 30; #[derive(Debug, Deserialize)] struct ErrorResponse { @@ -133,9 +135,42 @@ fn serialize_body_for_logging(body: Option<&B>) -> Result Result { - with_retry(|| async { request_builder.try_clone().unwrap().send().await }) - .await - .map_err(|e| ServerError::NetworkError(e.to_string())) + let max_502_retries: u32 = std::env::var("BAD_GATEWAY_RETRY_LIMIT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_BAD_GATEWAY_RETRIES); + + let retry_delay_secs: u64 = std::env::var("BAD_GATEWAY_RETRY_DELAY_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_BAD_GATEWAY_RETRY_DELAY_SECS); + + let mut bad_gateway_retries: u32 = 0; + + loop { + let response = with_retry(|| async { request_builder.try_clone().unwrap().send().await }) + .await + .map_err(|e| ServerError::NetworkError(e.to_string()))?; + + let status = response.status().as_u16(); + + if status != 502 { + return Ok(response); + } + + if bad_gateway_retries >= max_502_retries { + return Ok(response); + } + + log::warn!( + "Received 502 Bad Gateway from upstream. Waiting {delay}s before retry (attempt {}/{})", + bad_gateway_retries + 1, + max_502_retries, + delay = retry_delay_secs + ); + sleep_for(retry_delay_secs).await; + bad_gateway_retries += 1; + } } /// Handles HTTP response, including error cases and deserialization