diff --git a/Cargo.lock b/Cargo.lock index 702091c5d9..2c6e6bc1ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1032,7 +1032,7 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "api-types" version = "0.1.0" -source = "git+https://github.com/Galxe/gravity-aptos?rev=977f5b9388183c8a14c0ddcb4e2ac9f265d45184#977f5b9388183c8a14c0ddcb4e2ac9f265d45184" +source = "git+https://github.com/Galxe/gravity-aptos?rev=7b2d7949583169cc5c997856b4a0d17fec56ebf6#7b2d7949583169cc5c997856b4a0d17fec56ebf6" dependencies = [ "anyhow", "async-trait", @@ -10040,12 +10040,9 @@ dependencies = [ "alloy-transport", "anyhow", "api-types", - "once_cell", + "async-trait", + "hex", "reqwest", - "reth-primitives", - "reth-tracing", - "serde", - "serde_json", "tokio", "tracing", "url", diff --git a/Cargo.toml b/Cargo.toml index 2e68235d86..ccec43e0bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -377,7 +377,7 @@ codegen-units = 1 [workspace.dependencies] # reth -gravity-api-types = { package = "api-types", git = "https://github.com/Galxe/gravity-aptos", rev = "977f5b9388183c8a14c0ddcb4e2ac9f265d45184" } +gravity-api-types = { package = "api-types", git = "https://github.com/Galxe/gravity-aptos", rev = "7b2d7949583169cc5c997856b4a0d17fec56ebf6" } op-reth = { path = "crates/optimism/bin" } reth = { path = "bin/reth" } reth-storage-rpc-provider = { path = "crates/storage/rpc-provider" } diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/lib.rs b/crates/pipe-exec-layer-ext-v2/execute/src/lib.rs index 5613b55271..076574f4ca 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/lib.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/lib.rs @@ -4,7 +4,6 @@ mod channel; mod metrics; pub mod onchain_config; use alloy_sol_types::SolEvent; -pub use reth_pipe_exec_layer_relayer::{ObserveState, ObservedValue, RelayerManager}; use channel::Channel; use gravity_api_types::{ @@ -69,7 +68,8 @@ use tracing::*; use crate::onchain_config::{ construct_validator_txns_envelope, dkg::{convert_dkg_start_event_to_api, DKGStartEvent}, - observed_jwk::{convert_into_api_provider_jwks, ObservedJWKsUpdated}, + observed_jwk::convert_into_api_provider_jwks, + types::ObservedJWKsUpdated, SYSTEM_CALLER, }; diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_consensus_config.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_consensus_config.rs index d1c51550dc..842fe98b55 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_consensus_config.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_consensus_config.rs @@ -1,106 +1,47 @@ //! Fetcher for JWK consensus configuration from OracleTaskConfig //! -//! In the new Oracle architecture, provider configuration is stored in OracleTaskConfig -//! instead of JWKManager. The task is retrieved using: -//! OracleTaskConfig.getTask(sourceType=1, sourceId=0, taskName=keccak256("oidc_providers")) +//! In the new Oracle architecture, provider configuration is stored in OracleTaskConfig. +//! This fetcher enumerates ALL oracle tasks (JWK, blockchain, etc.) and formats them +//! as OIDCProviders for the gravity-aptos JWK consensus system. use super::{ base::{ConfigFetcher, OnchainConfigFetcher}, + oracle_task_helpers::{OracleTaskClient, SOURCE_TYPE_JWK}, ORACLE_TASK_CONFIG_ADDR, SYSTEM_CALLER, }; use alloy_eips::BlockId; use alloy_primitives::{keccak256, Address, Bytes, B256, U256}; use alloy_rpc_types_eth::TransactionRequest; use alloy_sol_macro::sol; -use alloy_sol_types::SolCall; use reth_rpc_eth_api::{helpers::EthCall, RpcTypes}; +use tracing::info; -/// Source type for JWK in the Oracle system -const SOURCE_TYPE_JWK: u32 = 1; - -/// Task name for OIDC providers configuration -/// keccak256("oidc_providers") +// Well-known task names fn oidc_providers_task_name() -> B256 { keccak256("oidc_providers") } -// OracleTaskConfig contract ABI (aligned with -// gravity_chain_core_contracts/src/oracle/IOracleTaskConfig.sol) -sol! { - /// @notice Configuration for a continuous oracle task - struct OracleTask { - /// Task configuration bytes (ABI-encoded OIDCProvider[]) - bytes config; - /// Timestamp when this task was last updated - uint64 updatedAt; - } +// ============================================================================= +// JWK-specific ABI (not shared with observed_jwk) +// ============================================================================= - /// @notice OIDC Provider stored in the task config - /// Note: This is what's ABI-encoded inside OracleTask.config - struct OIDCProvider { - bytes name; // Provider name, e.g., "https://accounts.google.com" - bytes configUrl; // OpenID configuration URL - uint64 blockNumber; // Onchain block number +sol! { + /// OIDC Provider stored in task config (for JWK sourceType=1) + struct TaskOIDCProvider { + bytes name; + bytes configUrl; + uint64 blockNumber; } - - /// @notice Get an oracle task by its key tuple - function getTask( - uint32 sourceType, - uint256 sourceId, - bytes32 taskName - ) external view returns (OracleTask memory task); } -/// Convert OracleTaskConfig providers to BCS-encoded JWKConsensusConfig -fn convert_task_config_to_bcs(task: OracleTask) -> Option { - if task.config.is_empty() { - tracing::warn!("OracleTaskConfig: oidc_providers task has empty config"); - let jwk_consensus_config = gravity_api_types::on_chain_config::jwks::JWKConsensusConfig { - enabled: false, - oidc_providers: Vec::new(), - }; - - return Some( - bcs::to_bytes(&jwk_consensus_config) - .expect("Failed to serialize JwkConsensusConfig") - .into(), - ); - } - - // ABI decode the config bytes to get OIDCProvider[] - // The config is ABI-encoded as a dynamic array of OIDCProvider structs - let providers = match as alloy_sol_types::SolType>::abi_decode( - &task.config, - ) { - Ok(p) => p, - Err(e) => { - tracing::warn!("Failed to ABI decode OIDCProvider[] from task config: {:?}", e); - return None; - } - }; - - let active_providers = providers - .iter() - .map(|provider| gravity_api_types::on_chain_config::jwks::OIDCProvider { - name: String::from_utf8_lossy(&provider.name).to_string(), - config_url: String::from_utf8_lossy(&provider.configUrl).to_string(), - onchain_block_number: Some(provider.blockNumber), - }) - .collect::>(); - - let jwk_consensus_config = gravity_api_types::on_chain_config::jwks::JWKConsensusConfig { - enabled: false, - oidc_providers: active_providers, - }; - - Some( - bcs::to_bytes(&jwk_consensus_config) - .expect("Failed to serialize JwkConsensusConfig") - .into(), - ) -} +// ============================================================================= +// JwkConsensusConfigFetcher +// ============================================================================= -/// Fetcher for JWK consensus configuration from OracleTaskConfig +/// Fetcher for JWK consensus configuration +/// +/// Enumerates ALL oracle tasks from OracleTaskConfig and formats them as OIDCProviders +/// for gravity-aptos JWK consensus. #[derive(Debug)] pub struct JwkConsensusConfigFetcher<'a, EthApi> { base_fetcher: &'a OnchainConfigFetcher, @@ -109,11 +50,68 @@ pub struct JwkConsensusConfigFetcher<'a, EthApi> { impl<'a, EthApi> JwkConsensusConfigFetcher<'a, EthApi> where EthApi: EthCall, + EthApi::NetworkTypes: RpcTypes, { - /// Create a new consensus config fetcher pub const fn new(base_fetcher: &'a OnchainConfigFetcher) -> Self { Self { base_fetcher } } + + /// Get shared oracle task client + fn oracle_client(&self) -> OracleTaskClient<'_, EthApi> { + OracleTaskClient::new(self.base_fetcher) + } + + /// Fetch and parse JWK providers (sourceType=1) + fn fetch_jwk_providers( + &self, + block_id: BlockId, + ) -> Vec { + let mut providers = Vec::new(); + + // Try to get the "oidc_providers" task for JWK + if let Some(task) = self.oracle_client().call_get_task( + SOURCE_TYPE_JWK, + U256::ZERO, + oidc_providers_task_name(), + block_id, + ) { + if !task.config.is_empty() { + // ABI decode TaskOIDCProvider[] + if let Ok(oidc_providers) = as alloy_sol_types::SolType>::abi_decode(&task.config) { + for provider in oidc_providers { + providers.push(gravity_api_types::on_chain_config::jwks::OIDCProvider { + name: String::from_utf8_lossy(&provider.name).to_string(), + config_url: String::from_utf8_lossy(&provider.configUrl).to_string(), + onchain_nonce: None, // JWK providers don't use nonce + }); + } + } + } + } + + providers + } + + /// Fetch and parse blockchain providers (sourceType=0) + /// Uses shared OracleTaskClient for task enumeration + fn fetch_blockchain_providers( + &self, + block_id: BlockId, + ) -> Vec { + let task_uris = self.oracle_client().fetch_blockchain_task_uris(block_id); + + task_uris + .into_iter() + .map(|(uri, nonce)| { + info!(uri = %uri, nonce = nonce, "Found blockchain monitoring task"); + gravity_api_types::on_chain_config::jwks::OIDCProvider { + name: uri.clone(), + config_url: uri, + onchain_nonce: Some(nonce as u64), + } + }) + .collect() + } } impl<'a, EthApi> ConfigFetcher for JwkConsensusConfigFetcher<'a, EthApi> @@ -122,30 +120,28 @@ where EthApi::NetworkTypes: RpcTypes, { fn fetch(&self, block_id: BlockId) -> Option { - // Call OracleTaskConfig.getTask(SOURCE_TYPE_JWK, 0, keccak256("oidc_providers")) - let call = getTaskCall { - sourceType: SOURCE_TYPE_JWK, - sourceId: U256::ZERO, - taskName: oidc_providers_task_name(), - }; - let input: Bytes = call.abi_encode().into(); - - let result = self - .base_fetcher - .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) - .map_err(|e| { - tracing::warn!( - "Failed to fetch JWK consensus config from OracleTaskConfig at block {}: {:?}", - block_id, - e - ); - }) - .ok()?; + let mut all_providers = Vec::new(); - let task = getTaskCall::abi_decode_returns(&result) - .expect("Failed to decode getTask return value"); + // 1. Fetch JWK providers (sourceType=1) + all_providers.extend(self.fetch_jwk_providers(block_id)); - convert_task_config_to_bcs(task) + // 2. Fetch blockchain providers (sourceType=0) + all_providers.extend(self.fetch_blockchain_providers(block_id)); + + info!(provider_count = all_providers.len(), "Fetched oracle task providers"); + + // Build JWKConsensusConfig + // TODO(gravity): should read the onchain config + let jwk_consensus_config = gravity_api_types::on_chain_config::jwks::JWKConsensusConfig { + enabled: !all_providers.is_empty(), + oidc_providers: all_providers, + }; + + Some( + bcs::to_bytes(&jwk_consensus_config) + .expect("Failed to serialize JwkConsensusConfig") + .into(), + ) } fn contract_address() -> Address { diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_oracle.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_oracle.rs new file mode 100644 index 0000000000..f339ee67f7 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/jwk_oracle.rs @@ -0,0 +1,269 @@ +//! JWK Oracle module for writing oracle updates via NativeOracle.record() +//! +//! This module handles the WRITE path for ALL oracle updates in the new Oracle architecture: +//! - RSA JWKs: NativeOracle.record(sourceType=1, sourceId=keccak256(issuer)) +//! - UnsupportedJWK (blockchain events): NativeOracle.recordBatch() for multiple logs +//! +//! For blockchain events, the payload from relayer is ABI-encoded and passed through unchanged. +//! This ensures byte-exact match between relayer, on-chain storage, and read-back for comparison. + +use super::{ + new_system_call_txn, + types::{convert_oracle_rsa_to_api_jwk, GaptosRsaJwk, OracleRSA_JWK, SOURCE_TYPE_JWK}, + NATIVE_ORACLE_ADDR, +}; +use alloy_primitives::{keccak256, Bytes, U256}; +use alloy_sol_macro::sol; +use alloy_sol_types::SolCall; +use gravity_api_types::on_chain_config::jwks::{JWKStruct, ProviderJWKs}; +use reth_ethereum_primitives::TransactionSigned; +use tracing::{debug, info}; + +/// Default callback gas limit for oracle updates +const CALLBACK_GAS_LIMIT: u64 = 500_000; + +// ============================================================================= +// Solidity Types (NativeOracle function signatures) +// ============================================================================= + +sol! { + /// NativeOracle.record() function signature + function record( + uint32 sourceType, + uint256 sourceId, + uint128 nonce, + bytes calldata payload, + uint256 callbackGasLimit + ) external; + + /// NativeOracle.recordBatch() function signature for multiple events + function recordBatch( + uint32 sourceType, + uint256 sourceId, + uint128[] calldata nonces, + bytes[] calldata payloads, + uint256[] calldata callbackGasLimits + ) external; +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/// Check if a JWKStruct is an RSA JWK +fn is_rsa_jwk(jwk: &JWKStruct) -> bool { + jwk.type_name == "0x1::jwks::RSA_JWK" +} + +/// Check if a JWKStruct is an UnsupportedJWK (blockchain/other oracle data) +/// Checks for sourceType string (0, 1, 2, etc.) instead of fixed type_name +fn is_unsupported_jwk(jwk: &JWKStruct) -> bool { + // Check if type_name is a numeric string (sourceType) + jwk.type_name.parse::().is_ok() +} + +/// Parse RSA JWK from BCS-encoded data +fn parse_rsa_jwk_from_bcs(data: &[u8]) -> Option { + let gaptos_jwk: GaptosRsaJwk = bcs::from_bytes(data).ok()?; + Some(OracleRSA_JWK { + kid: gaptos_jwk.kid, + kty: gaptos_jwk.kty, + alg: gaptos_jwk.alg, + e: gaptos_jwk.e, + n: gaptos_jwk.n, + }) +} + +/// Parse chain_id from issuer URI +/// Format: gravity://{chain_id}/events?... +fn parse_chain_id_from_issuer(issuer: &[u8]) -> Option { + let issuer_str = String::from_utf8_lossy(issuer); + if issuer_str.starts_with("gravity://") { + let after_protocol = &issuer_str[10..]; + if let Some(slash_pos) = after_protocol.find('/') { + let chain_id_str = &after_protocol[..slash_pos]; + return chain_id_str.parse().ok(); + } + } + None +} + +/// Extract nonce from ABI-encoded event payload +/// Payload format: abi.encode(address, bytes32[], bytes, uint64 block_number, uint64 log_index) +/// Returns block_number * 1000 + log_index as unique nonce +fn extract_nonce_from_payload(payload: &[u8]) -> Option { + // ABI-encoded payload structure: + // - address (32 bytes padded) + // - offset to topics array (32 bytes) + // - offset to data bytes (32 bytes) + // - block_number (32 bytes as uint64) + // - log_index (32 bytes as uint64) + // Then dynamic data... + + if payload.len() < 160 { + // Minimum: 5 * 32 bytes for fixed parts + return None; + } + + // block_number is at offset 96 (3 * 32) + let block_number_bytes = &payload[96..128]; + let block_number = U256::from_be_slice(&block_number_bytes[..32]); + + // log_index is at offset 128 (4 * 32) + let log_index_bytes = &payload[128..160]; + let log_index = U256::from_be_slice(&log_index_bytes[..32]); + + let nonce = block_number.saturating_to::() * 1000 + log_index.saturating_to::(); + Some(nonce) +} + +// ============================================================================= +// Public API +// ============================================================================= + +/// Construct transaction for oracle update via NativeOracle.record() +/// +/// This is the unified entry point for ALL oracle updates. It routes based on JWK type: +/// - RSA_JWK → sourceType=1 (JWK), payload=ABI(issuer, version, jwks[]) +/// - UnsupportedJWK → Uses recordBatch for ALL logs (payload passed through unchanged) +/// +/// Note: All JWKs in provider_jwks.jwks are guaranteed to be of the same type +/// (either all RSA or all unsupported), so we only check the first element. +pub fn construct_oracle_record_transaction( + provider_jwks: ProviderJWKs, + nonce: u64, + gas_price: u128, +) -> Result { + let issuer = &provider_jwks.issuer; + let issuer_str = String::from_utf8_lossy(issuer); + + // All JWKs are homogeneous, check the first one to determine the type + let first_jwk = provider_jwks + .jwks + .first() + .ok_or_else(|| format!("No JWKs found for issuer: {}", issuer_str))?; + + if is_rsa_jwk(first_jwk) { + // RSA JWK update + construct_jwk_record_transaction(provider_jwks, nonce, gas_price) + } else if is_unsupported_jwk(first_jwk) { + // Blockchain/oracle events - use recordBatch for ALL logs + construct_blockchain_batch_transaction(provider_jwks, nonce, gas_price) + } else { + Err(format!("Unknown JWK type '{}' for issuer: {}", first_jwk.type_name, issuer_str)) + } +} + +/// Construct transaction for JWK update (sourceType=1) +fn construct_jwk_record_transaction( + provider_jwks: ProviderJWKs, + nonce: u64, + gas_price: u128, +) -> Result { + let issuer = &provider_jwks.issuer; + let version = provider_jwks.version; + + // All JWKs are guaranteed to be RSA type when entering this function + let rsa_jwks: Vec = + provider_jwks.jwks.iter().filter_map(|jwk| parse_rsa_jwk_from_bcs(&jwk.data)).collect(); + + info!( + issuer = %String::from_utf8_lossy(issuer), + version = version, + jwk_count = rsa_jwks.len(), + "Constructing JWK record transaction" + ); + + // sourceId = keccak256(issuer) + let issuer_hash = keccak256(issuer); + let source_id = U256::from_be_bytes(issuer_hash.0); + + // Encode payload: (bytes issuer, uint64 version, OracleRSA_JWK[] jwks) + let payload = alloy_sol_types::SolValue::abi_encode(&(issuer.as_slice(), version, rsa_jwks)); + + let call = recordCall { + sourceType: SOURCE_TYPE_JWK, + sourceId: source_id, + nonce: nonce as u128, + payload: payload.into(), + callbackGasLimit: U256::from(CALLBACK_GAS_LIMIT), + }; + + let input: Bytes = call.abi_encode().into(); + Ok(new_system_call_txn(NATIVE_ORACLE_ADDR, nonce, gas_price, input)) +} + +/// Construct transaction for blockchain events using recordBatch() +/// +/// This handles ALL UnsupportedJWK entries (each represents one event). +/// The payload is passed through UNCHANGED from relayer - this ensures +/// byte-exact match between what relayer sends and what gets stored on-chain. +fn construct_blockchain_batch_transaction( + provider_jwks: ProviderJWKs, + nonce: u64, + gas_price: u128, +) -> Result { + let issuer = &provider_jwks.issuer; + let jwks = &provider_jwks.jwks; + + // Parse chain_id from issuer + let chain_id = parse_chain_id_from_issuer(issuer) + .ok_or_else(|| format!("Failed to parse chain_id from issuer: {:?}", issuer))?; + + // All JWKs are guaranteed to be unsupported type when entering this function + if jwks.is_empty() { + return Err("No blockchain event JWKs found".to_string()); + } + + // Parse sourceType from first JWK's type_name (all have same type) + let source_type: u32 = jwks[0] + .type_name + .parse() + .map_err(|_| format!("Invalid sourceType: {}", jwks[0].type_name))?; + + // Build batch arrays + let mut nonces: Vec = Vec::with_capacity(jwks.len()); + let mut payloads: Vec = Vec::with_capacity(jwks.len()); + let mut gas_limits: Vec = Vec::with_capacity(jwks.len()); + + for (idx, jwk) in jwks.iter().enumerate() { + // Extract nonce from the ABI-encoded payload + let event_nonce = extract_nonce_from_payload(&jwk.data) + .ok_or_else(|| format!("Failed to extract nonce from payload at index {}", idx))?; + + nonces.push(event_nonce); + // Pass payload through UNCHANGED - this is critical for comparison matching + // The relayer already ABI-encoded the data, we just store it as-is + payloads.push(jwk.data.clone().into()); + gas_limits.push(U256::from(CALLBACK_GAS_LIMIT)); + + debug!( + idx = idx, + event_nonce = event_nonce, + payload_len = jwk.data.len(), + "Added event to batch (pass-through)" + ); + } + + info!( + issuer = %String::from_utf8_lossy(issuer), + chain_id = chain_id, + source_type = source_type, + event_count = nonces.len(), + "Constructing blockchain recordBatch transaction (pass-through payload)" + ); + + // Use recordBatch for multiple events + let call = recordBatchCall { + sourceType: source_type, + sourceId: U256::from(chain_id), + nonces, + payloads, + callbackGasLimits: gas_limits, + }; + + let input: Bytes = call.abi_encode().into(); + Ok(new_system_call_txn(NATIVE_ORACLE_ADDR, nonce, gas_price, input)) +} + +// convert_oracle_rsa_to_api_jwk is now provided by super::types diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/mod.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/mod.rs index 6231331200..d51f380ad7 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/mod.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/mod.rs @@ -7,8 +7,10 @@ pub mod consensus_config; pub mod dkg; pub mod epoch; pub mod jwk_consensus_config; +pub mod jwk_oracle; pub mod metadata_txn; pub mod observed_jwk; +pub mod oracle_task_helpers; pub mod types; pub mod validator_set; @@ -97,7 +99,7 @@ use alloy_consensus::{EthereumTxEnvelope, TxEip4844, TxLegacy}; use alloy_primitives::{Bytes, Signature, U256}; use reth_ethereum_primitives::{Transaction, TransactionSigned}; use revm_primitives::TxKind; -use tracing::{info, debug}; +use tracing::{debug, info}; /// Construct validator transactions envelope (JWK updates and DKG transcripts) /// @@ -130,7 +132,7 @@ pub fn construct_validator_txns_envelope( /// Process extra data based on its ExtraDataType variant /// /// Supports: -/// - JWK updates (ExtraDataType::JWK) +/// - JWK/Oracle updates (ExtraDataType::JWK) - includes both RSA JWKs and blockchain events /// - DKG transcripts (ExtraDataType::DKG) fn process_extra_data( data: &gravity_api_types::ExtraDataType, @@ -144,11 +146,13 @@ fn process_extra_data( gravity_api_types::on_chain_config::jwks::ProviderJWKs, >(data_bytes) .map_err(|e| format!("Failed to deserialize JWK data: {}", e))?; - info!( - "Processing JWK update for issuer: {}", - String::from_utf8_lossy(&provider_jwks.issuer) - ); - observed_jwk::construct_jwk_transaction(provider_jwks, nonce, gas_price) + + let issuer_str = String::from_utf8_lossy(&provider_jwks.issuer); + info!("Processing oracle update for issuer: {}", issuer_str); + + // Unified handler for all oracle data (JWK and blockchain events) + // Routing between sourceType is done inside construct_oracle_record_transaction + jwk_oracle::construct_oracle_record_transaction(provider_jwks, nonce, gas_price) } gravity_api_types::ExtraDataType::DKG(data_bytes) => { // Deserialize as DKGTranscript diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/observed_jwk.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/observed_jwk.rs index 17ce40fdc2..3450ca6d27 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/observed_jwk.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/observed_jwk.rs @@ -1,405 +1,50 @@ -//! Fetcher for JWK (JSON Web Key) on-chain configuration +//! Fetcher for Oracle on-chain data +//! +//! This module handles the READ path for ALL oracle data from the chain: +//! - RSA JWKs from JWKManager contract +//! - Blockchain events from NativeOracle (sourced from OracleTaskConfig) +//! +//! Both are packaged as AllProvidersJWKs for gravity-aptos JWK consensus. +//! The data format matches what the relayer sends for byte-exact comparison. use super::{ base::{ConfigFetcher, OnchainConfigFetcher}, - JWK_MANAGER_ADDR, NATIVE_ORACLE_ADDR, SYSTEM_CALLER, + oracle_task_helpers::OracleTaskClient, + types::{convert_oracle_rsa_to_api_jwk, getObservedJWKsCall, OracleProviderJWKs}, + JWK_MANAGER_ADDR, SYSTEM_CALLER, }; use alloy_eips::BlockId; -use alloy_primitives::{keccak256, Address, Bytes, B256, U256}; +use alloy_primitives::{Address, Bytes}; use alloy_rpc_types_eth::TransactionRequest; -use alloy_sol_macro::sol; -use alloy_sol_types::{SolCall, SolEvent, SolType}; -use gravity_api_types::on_chain_config::jwks::JWKStruct; -use reth_ethereum_primitives::TransactionSigned; +use alloy_sol_types::SolCall; +use gravity_api_types::on_chain_config::jwks::ProviderJWKs; use reth_rpc_eth_api::{helpers::EthCall, RpcTypes}; use std::fmt::Debug; -use tracing::info; - -sol! { - event DepositGravityEvent( - address user, - uint256 amount, - address targetAddress, - uint256 blockNumber - ); - - event ChangeRecord( - bytes32 key, - bytes32 value, - uint256 blockNumber, - address updater, - uint256 sequenceNumber - ); -} - -sol! { - struct CrossChainParams { - // 1 => CrossChainDepositEvent - bytes id; - address sender; - address targetAddress; - uint256 amount; - uint256 blockNumber; - string issuer; - bytes data; // 额外数据(用于哈希记录等) - } - - // 0 => Raw, - // 1 => CrossChainDepositEvent - struct UnsupportedJWK { - bytes id; - bytes payload; - } - struct JWK { - uint8 variant; // 0: RSA_JWK, 1: UnsupportedJWK - bytes data; // Encoded JWK data - } - - /// @dev Provider's JWK collection - struct ProviderJWKs { - string issuer; // Issuer - uint64 version; // Version number - JWK[] jwks; // JWK array, sorted by kid - } - - /// @dev All providers' JWK collection - struct AllProvidersJWKs { - ProviderJWKs[] entries; // Provider array sorted by issuer - } - function getObservedJWKs() external view returns (AllProvidersJWKs memory); - - function upsertObservedJWKs( - ProviderJWKs[] calldata providerJWKsArray, - CrossChainParams[] calldata crossChainParamsArray - ) external; - - event ObservedJWKsUpdated(uint256 indexed epoch, ProviderJWKs[] jwks); -} +use tracing::{debug, info}; // ============================================================================= -// New Oracle Contract Types (gravity_chain_core_contracts/src/oracle/jwk/) +// Conversion Functions // ============================================================================= -sol! { - /// @dev RSA JWK structure from new Oracle JWKManager contract - /// Note: This struct does NOT have a `kty` field, but gaptos expects one. - /// See convert_oracle_rsa_to_api_jwk() for the workaround. - #[derive(Debug)] - struct OracleRSA_JWK { - string kid; // Key ID - string alg; // Algorithm (e.g., "RS256") - string e; // RSA public exponent (Base64url) - string n; // RSA modulus (Base64url) - } - - /// @dev Provider's JWK collection from new Oracle JWKManager - struct OracleProviderJWKs { - bytes issuer; // Issuer URL - uint64 version; // Version number - OracleRSA_JWK[] jwks; // RSA JWK array - } - - /// @dev All providers' JWK collection from new Oracle JWKManager - struct OracleAllProvidersJWKs { - OracleProviderJWKs[] entries; - } - - /// @dev NativeOracle.record() function signature - function record( - uint32 sourceType, - uint256 sourceId, - uint128 nonce, - bytes calldata payload, - uint256 callbackGasLimit - ) external; -} - -/// Convert new Oracle contract RSA_JWK to api-types JWKStruct -/// -/// The new Oracle JWKManager contract stores RSA_JWK{kid, alg, e, n} without a `kty` field. -/// However, gaptos expects RSA_JWK{kid, kty, alg, e, n} with all 5 fields. -/// This function adds the missing `kty` field with a hardcoded value "RSA". -/// -/// TODO(gravity): Add `kty` field to the Solidity RSA_JWK struct in -/// gravity_chain_core_contracts/src/oracle/jwk/IJWKManager.sol to properly store this value -/// on-chain instead of hardcoding it here. -fn convert_oracle_rsa_to_api_jwk(rsa_jwk: OracleRSA_JWK) -> JWKStruct { - // Create gaptos-compatible RSA_JWK struct with all 5 fields - // The struct order in gaptos is: kid, kty, alg, e, n - #[derive(serde::Serialize)] - struct GaptosRsaJwk { - kid: String, - kty: String, - alg: String, - e: String, - n: String, - } - - let gaptos_rsa = GaptosRsaJwk { - kid: rsa_jwk.kid, - kty: "RSA".to_string(), // TODO(gravity): Read from contract once kty field is added - alg: rsa_jwk.alg, - e: rsa_jwk.e, - n: rsa_jwk.n, - }; - - JWKStruct { - type_name: "0x1::jwks::RSA_JWK".to_string(), - data: bcs::to_bytes(&gaptos_rsa).expect("Failed to BCS serialize RSA_JWK"), - } -} - -fn convert_into_api_jwk(jwk: JWK) -> JWKStruct { - if jwk.variant == 0 { - // Note: Gravity relayer does not fetch RSA JWKs directly. RSA JWKs are fetched in Aptos - JWKStruct { type_name: "0x1::jwks::RSA_JWK".to_string(), data: jwk.data.into() } - } else { - // All data fetched by gravity relayer is contained within UnsupportedJWK in the data field - JWKStruct { type_name: "0x1::jwks::UnsupportedJWK".to_string(), data: jwk.data.into() } - } -} - -pub fn convert_into_api_provider_jwks( - provider_jwks: ProviderJWKs, -) -> gravity_api_types::on_chain_config::jwks::ProviderJWKs { - gravity_api_types::on_chain_config::jwks::ProviderJWKs { - issuer: provider_jwks.issuer.into(), - version: provider_jwks.version, - jwks: provider_jwks - .jwks - .iter() - .map(|jwk: &JWK| convert_into_api_jwk(jwk.clone())) - .collect::>(), - } -} - -fn convert_into_sol_provider_jwks( - provider_jwks: gravity_api_types::on_chain_config::jwks::ProviderJWKs, -) -> ProviderJWKs { +/// Convert Oracle ProviderJWKs to api-types ProviderJWKs +fn convert_oracle_provider_jwks(provider_jwks: OracleProviderJWKs) -> ProviderJWKs { ProviderJWKs { - issuer: String::from_utf8(provider_jwks.issuer) - .expect("Failed to convert issuer to string"), + issuer: provider_jwks.issuer.to_vec(), version: provider_jwks.version, - jwks: provider_jwks - .jwks - .into_iter() - .map(|jwk| { - let variant = match jwk.type_name.as_str() { - "0x1::jwks::RSA_JWK" => 0, - _ => 1, - }; - JWK { variant, data: jwk.data.into() } - }) - .collect(), - } -} - -/// Parse chain_id from issuer URI -/// Issuer format: -/// gravity://{chain_id}/event?address={contract_address}&topic0={topic0}&fromBlock={from_block} -fn parse_chain_id_from_issuer(issuer: &str) -> Option { - if issuer.starts_with("gravity://") { - // Extract the part after "gravity://" and before the next "/" - let after_protocol = &issuer[10..]; // Skip "gravity://" - if let Some(slash_pos) = after_protocol.find('/') { - let chain_id_str = &after_protocol[..slash_pos]; - return chain_id_str.parse().ok(); - } + jwks: provider_jwks.jwks.into_iter().map(convert_oracle_rsa_to_api_jwk).collect(), } - None } -fn convert_into_sol_crosschain_params(jwks: &Vec, issuer: &str) -> Vec { - jwks.iter() - .filter(|jwk| jwk.variant == 1) - .map(|jwk| process_unsupported_jwk(jwk, &issuer)) - .collect() +/// Convert Oracle ProviderJWKs to api-types ProviderJWKs (pub for lib.rs event parsing) +pub fn convert_into_api_provider_jwks(provider_jwks: OracleProviderJWKs) -> ProviderJWKs { + convert_oracle_provider_jwks(provider_jwks) } -fn process_unsupported_jwk(jwk: &JWK, issuer: &str) -> CrossChainParams { - let unsupported_jwk = UnsupportedJWK::abi_decode(&jwk.data).unwrap(); - let id_string = String::from_utf8(unsupported_jwk.id.to_vec()) - .expect("Failed to convert id bytes to string"); - let data_type: u8 = id_string.parse().expect("Failed to parse data_type from string"); - - match data_type { - hash if hash == 1 => { - // DepositGravityEvent - let event = DepositGravityEvent::abi_decode_data(&unsupported_jwk.payload).unwrap(); - - info!(target: "observed_jwk stake event", - user=?event.0, - amount=?event.1, - target_address=?event.2, - block_number=?event.3, - "observed_jwk stake event created" - ); - CrossChainParams { - id: unsupported_jwk.id, - sender: event.0, - targetAddress: event.2, - amount: event.1, - blockNumber: event.3, - issuer: issuer.to_string(), - data: Bytes::new(), // deposit模式为空 - } - } - hash if hash == 2 => { - // ChangeRecord - // All parameters are non-indexed, so all fields are in the data part - let event = ChangeRecord::abi_decode_data(&unsupported_jwk.payload).unwrap(); - let hash_value: B256 = event.1; - let block_number = event.2; - let sequence_number = event.4; - - //bytes memory data = crossChainParam.data; - // require(data.length == 76, "Invalid hash record data length"); - - // bytes32 hash; - // uint64 sourceBlockNumber; - // uint32 sourceChainId; - // uint256 sequenceNumber; - - // assembly { - // hash := mload(add(data, 32)) - // sourceBlockNumber := mload(add(data, 64)) - // sourceChainId := mload(add(data, 72)) - // sequenceNumber := mload(add(data, 104)) - // } - // Build 76-byte data according to Solidity contract expectations: - // - hash (bytes32): 32 bytes at offset 0 (from event.1) - // - sourceBlockNumber (uint64): 8 bytes at offset 32 (from event.2, converted to u64) - // - sourceChainId (uint32): 4 bytes at offset 40 (parsed from issuer) - // - sequenceNumber (uint256): 32 bytes at offset 44 (from event.4) - // Total: 76 bytes - let mut data_bytes = vec![0u8; 76]; - - // hash (bytes32) - 32 bytes at offset 0 (event.1) - data_bytes[0..32].copy_from_slice(hash_value.as_slice()); - - // sourceBlockNumber (uint64) - 8 bytes at offset 32 (event.2 converted to u64) - let source_block_number = block_number.to::(); - data_bytes[32..40].copy_from_slice(&source_block_number.to_be_bytes()); - - // sourceChainId (uint32) - 4 bytes at offset 40 (parsed from issuer) - let source_chain_id = parse_chain_id_from_issuer(issuer).unwrap_or(0u32); - - info!(target: "observed_jwk change record event", - key=?event.0, - hash=?hash_value, - block_number=?block_number, - updater=?event.3, - sequence_number=?sequence_number, - source_chain_id=?source_chain_id, - "observed_jwk change record event created" - ); - - data_bytes[40..44].copy_from_slice(&source_chain_id.to_be_bytes()); - - // sequenceNumber (uint256) - 32 bytes at offset 44 (event.4) - let sequence_bytes: [u8; 32] = sequence_number.to_be_bytes(); - data_bytes[44..76].copy_from_slice(&sequence_bytes); - - CrossChainParams { - id: unsupported_jwk.id, - sender: event.3, // updater is in event.3 - targetAddress: Address::ZERO, // ChangeRecord doesn't have targetAddress - amount: U256::ZERO, // ChangeRecord doesn't have amount - blockNumber: block_number, - issuer: issuer.to_string(), - data: Bytes::from(data_bytes), // Store 76-byte data as expected by contract - } - } - _ => panic!("Unsupported event type: {:?}, id: {:?}", data_type, unsupported_jwk.id), - } -} - -fn convert_into_api_all_providers_jwks( - all_providers_jwks: AllProvidersJWKs, -) -> gravity_api_types::on_chain_config::jwks::AllProvidersJWKs { - gravity_api_types::on_chain_config::jwks::AllProvidersJWKs { - entries: all_providers_jwks - .entries - .iter() - .map(|provider_jwks: &ProviderJWKs| { - convert_into_api_provider_jwks(provider_jwks.clone()) - }) - .collect::>(), - } -} - -fn convert_into_observed_jwks( - all_providers_jwks: AllProvidersJWKs, -) -> gravity_api_types::on_chain_config::jwks::ObservedJWKs { - gravity_api_types::on_chain_config::jwks::ObservedJWKs { - jwks: convert_into_api_all_providers_jwks(all_providers_jwks), - } -} - -fn convert_into_bcs_all_providers_jwks(all_providers_jwks: AllProvidersJWKs) -> Bytes { - let all_providers = convert_into_observed_jwks(all_providers_jwks); - bcs::to_bytes(&all_providers).expect("Failed to serialize AllProvidersJWKs").into() -} - -/// Source type constants for NativeOracle -const SOURCE_TYPE_BLOCKCHAIN: u32 = 0; -const SOURCE_TYPE_JWK: u32 = 1; - -/// Default callback gas limit for JWK updates -const JWK_CALLBACK_GAS_LIMIT: u64 = 500_000; - -/// Construct JWK transaction from ProviderJWKs -/// -/// In the new Oracle architecture, JWK updates are recorded via NativeOracle.record(). -/// - RSA JWKs → NativeOracle.record(sourceType=1, sourceId=keccak256(issuer)) -/// - UnsupportedJWK with id="1" (Deposit) → NativeOracle.record(sourceType=0) -/// - UnsupportedJWK with id="2" (ChangeRecord) → NativeOracle.record(sourceType=0) -pub(crate) fn construct_jwk_transaction( - provider_jwks: gravity_api_types::on_chain_config::jwks::ProviderJWKs, - nonce: u64, - gas_price: u128, -) -> Result { - let sol_provider_jwks = convert_into_sol_provider_jwks(provider_jwks.clone()); - - // Calculate sourceId from issuer hash - let issuer_hash = keccak256(&provider_jwks.issuer); - let source_id = U256::from_be_bytes(issuer_hash.0); - - // Encode payload: (bytes issuer, uint64 version, RSA_JWK[] jwks) - // Note: This encodes the RSA JWKs for JWKManager callback - let rsa_jwks: Vec = sol_provider_jwks - .jwks - .iter() - .filter(|jwk| jwk.variant == 0) // Only RSA JWKs - .map(|jwk| { - // Decode RSA_JWK from JWK.data - // For now, we pass through the data as-is since the format matches - // TODO: Proper conversion if needed - OracleRSA_JWK { - kid: String::new(), // Will be filled from decoded data - alg: String::new(), - e: String::new(), - n: String::new(), - } - }) - .collect(); - - // For now, continue using the old upsertObservedJWKs for backward compatibility - // TODO: Migrate to NativeOracle.record() once contract migration is complete - let cross_chain_params = convert_into_sol_crosschain_params( - &sol_provider_jwks.jwks, - sol_provider_jwks.issuer.as_str(), - ); - - let call = upsertObservedJWKsCall { - providerJWKsArray: vec![sol_provider_jwks], - crossChainParamsArray: cross_chain_params, - }; - let input: Bytes = call.abi_encode().into(); - Ok(super::new_system_call_txn(JWK_MANAGER_ADDR, nonce, gas_price, input)) -} +// ============================================================================= +// ObservedJwkFetcher - Unified READ Path +// ============================================================================= -/// Fetcher for consensus configuration +/// Fetcher for all oracle data (JWKs + blockchain events) #[derive(Debug)] pub struct ObservedJwkFetcher<'a, EthApi> { base_fetcher: &'a OnchainConfigFetcher, @@ -408,33 +53,84 @@ pub struct ObservedJwkFetcher<'a, EthApi> { impl<'a, EthApi> ObservedJwkFetcher<'a, EthApi> where EthApi: EthCall, + EthApi::NetworkTypes: RpcTypes, { - /// Create a new consensus config fetcher pub const fn new(base_fetcher: &'a OnchainConfigFetcher) -> Self { Self { base_fetcher } } -} -impl<'a, EthApi> ConfigFetcher for ObservedJwkFetcher<'a, EthApi> -where - EthApi: EthCall, - EthApi::NetworkTypes: RpcTypes, -{ - fn fetch(&self, block_id: BlockId) -> Option { + /// Get shared oracle task client + fn oracle_client(&self) -> OracleTaskClient<'_, EthApi> { + OracleTaskClient::new(self.base_fetcher) + } + + /// Fetch JWKs from JWKManager contract + fn fetch_jwk_manager_providers(&self, block_id: BlockId) -> Option> { let call = getObservedJWKsCall {}; let input: Bytes = call.abi_encode().into(); let result = self .base_fetcher - .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) + .eth_call(SYSTEM_CALLER, JWK_MANAGER_ADDR, input, block_id) .map_err(|e| { - tracing::warn!("Failed to fetch observed JWKs at block {}: {:?}", block_id, e); + debug!("Failed to fetch JWKs from JWKManager: {:?}", e); }) .ok()?; - let solidity_all_providers_jwks = getObservedJWKsCall::abi_decode_returns(&result) - .expect("Failed to decode getObservedJWKs return value"); - Some(convert_into_bcs_all_providers_jwks(solidity_all_providers_jwks)) + let oracle_jwks = getObservedJWKsCall::abi_decode_returns(&result).ok()?; + + Some(oracle_jwks.entries.into_iter().map(convert_oracle_provider_jwks).collect()) + } + + /// Fetch blockchain event providers using shared OracleTaskClient + fn fetch_blockchain_providers(&self, block_id: BlockId) -> Vec { + let task_uris = self.oracle_client().fetch_blockchain_task_uris(block_id); + + task_uris + .into_iter() + .map(|(uri, nonce)| ProviderJWKs { + issuer: uri.into_bytes(), + version: nonce as u64, + jwks: vec![], // Empty - only use version for comparison + }) + .collect() + } +} + +impl<'a, EthApi> ConfigFetcher for ObservedJwkFetcher<'a, EthApi> +where + EthApi: EthCall, + EthApi::NetworkTypes: RpcTypes, +{ + /// Fetch ALL oracle data (JWKs + blockchain events) and return as BCS-encoded ObservedJWKs + fn fetch(&self, block_id: BlockId) -> Option { + let mut all_entries: Vec = Vec::new(); + + // 1. Fetch JWKs from JWKManager + if let Some(jwk_entries) = self.fetch_jwk_manager_providers(block_id) { + all_entries.extend(jwk_entries); + } + + // 2. Fetch blockchain events from NativeOracle (for configured chains from + // OracleTaskConfig) + let blockchain_entries = self.fetch_blockchain_providers(block_id); + all_entries.extend(blockchain_entries); + + info!( + jwk_count = all_entries.iter().filter(|e| e.issuer.starts_with(b"https://")).count(), + blockchain_count = + all_entries.iter().filter(|e| e.issuer.starts_with(b"gravity://")).count(), + "Fetched all oracle providers" + ); + + // 3. Build and BCS-encode ObservedJWKs + let api_all_providers = + gravity_api_types::on_chain_config::jwks::AllProvidersJWKs { entries: all_entries }; + + let observed_jwks = + gravity_api_types::on_chain_config::jwks::ObservedJWKs { jwks: api_all_providers }; + + Some(bcs::to_bytes(&observed_jwks).expect("Failed to BCS serialize ObservedJWKs").into()) } fn contract_address() -> Address { diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/oracle_task_helpers.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/oracle_task_helpers.rs new file mode 100644 index 0000000000..3076445e58 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/oracle_task_helpers.rs @@ -0,0 +1,270 @@ +//! Shared Oracle Task Configuration helpers +//! +//! This module provides common ABI definitions and helper functions for interacting +//! with OracleTaskConfig and NativeOracle contracts. Used by both: +//! - `jwk_consensus_config.rs` (for JWK consensus provider discovery) +//! - `observed_jwk.rs` (for observed JWK data fetching) + +use super::{ + base::OnchainConfigFetcher, NATIVE_ORACLE_ADDR, ORACLE_TASK_CONFIG_ADDR, SYSTEM_CALLER, +}; +use alloy_eips::BlockId; +use alloy_primitives::{Bytes, B256, U256}; +use alloy_rpc_types_eth::TransactionRequest; +use alloy_sol_macro::sol; +use alloy_sol_types::SolCall; +use reth_rpc_eth_api::{helpers::EthCall, RpcTypes}; +use tracing::{info, warn}; + +// ============================================================================= +// Constants +// ============================================================================= + +/// Source type for blockchain events in NativeOracle +pub const SOURCE_TYPE_BLOCKCHAIN: u32 = 0; + +// Re-export SOURCE_TYPE_JWK from types for consistency +pub use super::types::SOURCE_TYPE_JWK; + +// ============================================================================= +// Shared ABI Definitions +// ============================================================================= + +sol! { + // -------------------- OracleTaskConfig Types -------------------- + + /// Configuration for a continuous oracle task + struct OracleTask { + bytes config; + uint64 updatedAt; + } + + /// Get all registered source IDs for a given source type + function getSourceIds( + uint32 sourceType + ) external view returns (uint256[] memory sourceIds); + + /// Get all task names for a (sourceType, sourceId) pair + function getTaskNames( + uint32 sourceType, + uint256 sourceId + ) external view returns (bytes32[] memory taskNames); + + /// Get an oracle task by its key tuple + function getTask( + uint32 sourceType, + uint256 sourceId, + bytes32 taskName + ) external view returns (OracleTask memory task); + + // -------------------- NativeOracle Types -------------------- + + /// Get the latest nonce for a source (used to determine current progress) + function getLatestNonce( + uint32 sourceType, + uint256 sourceId + ) external view returns (uint128 nonce); +} + +// ============================================================================= +// OracleTaskClient - Shared Helper for Contract Calls +// ============================================================================= + +/// Client for interacting with OracleTaskConfig and NativeOracle contracts +#[derive(Debug)] +pub struct OracleTaskClient<'a, EthApi> { + base_fetcher: &'a OnchainConfigFetcher, +} + +impl<'a, EthApi> OracleTaskClient<'a, EthApi> +where + EthApi: EthCall, + EthApi::NetworkTypes: RpcTypes, +{ + pub const fn new(base_fetcher: &'a OnchainConfigFetcher) -> Self { + Self { base_fetcher } + } + + /// Call OracleTaskConfig.getTask() + pub fn call_get_task( + &self, + source_type: u32, + source_id: U256, + task_name: B256, + block_id: BlockId, + ) -> Option { + let call = + getTaskCall { sourceType: source_type, sourceId: source_id, taskName: task_name }; + let input: Bytes = call.abi_encode().into(); + + let result = self + .base_fetcher + .eth_call(SYSTEM_CALLER, ORACLE_TASK_CONFIG_ADDR, input, block_id) + .ok()?; + + getTaskCall::abi_decode_returns(&result).ok() + } + + /// Call OracleTaskConfig.getTaskNames() + pub fn call_get_task_names( + &self, + source_type: u32, + source_id: U256, + block_id: BlockId, + ) -> Option> { + let call = getTaskNamesCall { sourceType: source_type, sourceId: source_id }; + let input: Bytes = call.abi_encode().into(); + + let result = self + .base_fetcher + .eth_call(SYSTEM_CALLER, ORACLE_TASK_CONFIG_ADDR, input, block_id) + .ok()?; + + getTaskNamesCall::abi_decode_returns(&result).ok() + } + + /// Fetch registered source IDs from OracleTaskConfig for a given source type + pub fn fetch_registered_source_ids( + &self, + source_type: u32, + block_id: BlockId, + ) -> Option> { + let call = getSourceIdsCall { sourceType: source_type }; + let input: Bytes = call.abi_encode().into(); + + let result = self + .base_fetcher + .eth_call(SYSTEM_CALLER, ORACLE_TASK_CONFIG_ADDR, input, block_id) + .ok()?; + + getSourceIdsCall::abi_decode_returns(&result).ok() + } + + /// Call NativeOracle.getLatestNonce() to get current progress + pub fn call_get_latest_nonce( + &self, + source_type: u32, + source_id: U256, + block_id: BlockId, + ) -> Option { + let call = getLatestNonceCall { sourceType: source_type, sourceId: source_id }; + let input: Bytes = call.abi_encode().into(); + + let result = + self.base_fetcher.eth_call(SYSTEM_CALLER, NATIVE_ORACLE_ADDR, input, block_id).ok()?; + + getLatestNonceCall::abi_decode_returns(&result).ok() + } + + /// Fetch blockchain task URIs with their nonces + /// + /// Returns a vector of (URI, nonce) tuples for all configured blockchain tasks. + /// Returns tasks even when nonce is 0 (no data recorded yet) to enable discovery. + pub fn fetch_blockchain_task_uris(&self, block_id: BlockId) -> Vec<(String, u128)> { + let mut results = Vec::new(); + + // Get all registered blockchain source IDs + let source_ids = + self.fetch_registered_source_ids(SOURCE_TYPE_BLOCKCHAIN, block_id).unwrap_or_default(); + + info!( + target: "oracle_task_helper", + length = source_ids.len(), + "oracle task source ids length" + ); + + for source_id in source_ids { + self.process_source_tasks(source_id, block_id, &mut results); + } + + results + } + + /// Process all tasks for a single source ID + fn process_source_tasks( + &self, + source_id: U256, + block_id: BlockId, + results: &mut Vec<(String, u128)>, + ) { + // Fetch the latest nonce for this source (0 if no data recorded yet) + let nonce = + self.call_get_latest_nonce(SOURCE_TYPE_BLOCKCHAIN, source_id, block_id).unwrap_or(0); + + info!( + target: "oracle_task_helper", + source_id = source_id.to_string(), + nonce, + "oracle task source id and nonce" + ); + + let Some(task_names) = + self.call_get_task_names(SOURCE_TYPE_BLOCKCHAIN, source_id, block_id) + else { + return; + }; + + info!( + target: "oracle_task_helper", + length = task_names.len(), + "oracle task task names length" + ); + + for task_name in task_names { + self.process_single_task(source_id, task_name, nonce, block_id, results); + } + } + + /// Process a single task and add valid URI to results + fn process_single_task( + &self, + source_id: U256, + task_name: B256, + nonce: u128, + block_id: BlockId, + results: &mut Vec<(String, u128)>, + ) { + info!( + target: "oracle_task_helper", + task_name = task_name.to_string(), + "oracle task task name" + ); + + let Some(task) = self.call_get_task(SOURCE_TYPE_BLOCKCHAIN, source_id, task_name, block_id) + else { + return; + }; + + info!( + target: "oracle_task_helper", + task = task.config.to_string(), + "oracle task task" + ); + + if task.config.is_empty() { + return; + } + + let uri_string = String::from_utf8_lossy(&task.config).to_string(); + info!( + target: "oracle_task_helper", + uri_string = uri_string, + "oracle task uri string" + ); + + // Validate URI + match reth_pipe_exec_layer_relayer::uri_parser::parse_oracle_uri(&uri_string) { + Ok(_) => { + results.push((uri_string, nonce)); + } + Err(e) => { + warn!( + target: "oracle_task_helper", + uri_string = uri_string, + error = %e, + "Failed to parse oracle URI" + ); + } + } + } +} diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/types.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/types.rs index 39a65870c5..557ad3a1af 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/types.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/types.rs @@ -39,6 +39,13 @@ sol! { // Function from ValidatorManagement contract function getActiveValidators() external view returns (ValidatorConsensusInfo[] memory); + // Functions to get pending validators (now return full ValidatorConsensusInfo[]) + function getPendingActiveValidators() external view returns (ValidatorConsensusInfo[] memory); + function getPendingInactiveValidators() external view returns (ValidatorConsensusInfo[] memory); + + // Function to get total voting power directly from contract + function getTotalVotingPower() external view returns (uint256); + /// NewEpochEvent from Reconfiguration.sol /// Emitted when epoch transition completes with full validator set event NewEpochEvent( @@ -106,20 +113,105 @@ pub fn convert_validator_consensus_info( ) } -/// Convert array of ValidatorConsensusInfo to BCS-encoded ValidatorSet +/// Convert arrays of ValidatorConsensusInfo to BCS-encoded ValidatorSet /// Used by ValidatorSetFetcher to convert getActiveValidators() response -pub fn convert_active_validators_to_bcs(validators: &[ValidatorConsensusInfo]) -> Bytes { +/// +/// # Arguments +/// * `active_validators` - Active validators from getActiveValidators() +/// * `pending_active` - Validators pending activation (optional, from getCurValidatorConsensusInfos +/// style query) +/// * `pending_inactive` - Validators pending deactivation (still in active set for this epoch) +pub fn convert_validators_to_bcs( + active_validators: &[ValidatorConsensusInfo], + pending_active: &[ValidatorConsensusInfo], + pending_inactive: &[ValidatorConsensusInfo], +) -> Bytes { + // Calculate total voting power from active validators (in Ether units) let total_voting_power: u128 = - validators.iter().map(|v| wei_to_ether(v.votingPower).to::()).sum(); + active_validators.iter().map(|v| wei_to_ether(v.votingPower).to::()).sum(); + + // Calculate total joining power from pending_active validators + let total_joining_power: u128 = + pending_active.iter().map(|v| wei_to_ether(v.votingPower).to::()).sum(); let gravity_validator_set = GravityValidatorSet { - active_validators: validators.iter().map(convert_validator_consensus_info).collect(), - pending_inactive: vec![], // Not returned by getActiveValidators() - pending_active: vec![], // Not returned by getActiveValidators() + active_validators: active_validators.iter().map(convert_validator_consensus_info).collect(), + pending_inactive: pending_inactive.iter().map(convert_validator_consensus_info).collect(), + pending_active: pending_active.iter().map(convert_validator_consensus_info).collect(), total_voting_power, - total_joining_power: 0, // Not returned by getActiveValidators() + total_joining_power, }; // Serialize to BCS format (gravity-aptos standard) bcs::to_bytes(&gravity_validator_set).expect("Failed to serialize validator set").into() } + +/// Legacy function for backward compatibility - only active validators +/// Used when we don't have pending validator info available +pub fn convert_active_validators_to_bcs(validators: &[ValidatorConsensusInfo]) -> Bytes { + convert_validators_to_bcs(validators, &[], &[]) +} + +// ============================================================================= +// Oracle JWK Types (shared by jwk_oracle.rs and observed_jwk.rs) +// ============================================================================= + +/// Source type for JWK in NativeOracle +pub const SOURCE_TYPE_JWK: u32 = 1; + +sol! { + /// RSA JWK structure from JWKManager contract + struct OracleRSA_JWK { + string kid; + string kty; + string alg; + string e; + string n; + } + + /// Provider's JWK collection from JWKManager + struct OracleProviderJWKs { + bytes issuer; + uint64 version; + OracleRSA_JWK[] jwks; + } + + /// All providers' JWK collection from JWKManager + struct OracleAllProvidersJWKs { + OracleProviderJWKs[] entries; + } + + /// JWKManager.getObservedJWKs() + function getObservedJWKs() external view returns (OracleAllProvidersJWKs memory); + + /// Event emitted when JWKs are updated + event ObservedJWKsUpdated(uint256 indexed epoch, OracleProviderJWKs[] jwks); +} + +/// RSA JWK fields for BCS serialization - matches gravity-aptos struct order +#[derive(serde::Serialize, serde::Deserialize)] +pub struct GaptosRsaJwk { + pub kid: String, + pub kty: String, + pub alg: String, + pub e: String, + pub n: String, +} + +/// Convert Oracle RSA_JWK to api-types JWKStruct +pub fn convert_oracle_rsa_to_api_jwk( + rsa_jwk: OracleRSA_JWK, +) -> gravity_api_types::on_chain_config::jwks::JWKStruct { + let gaptos_rsa = GaptosRsaJwk { + kid: rsa_jwk.kid, + kty: rsa_jwk.kty, + alg: rsa_jwk.alg, + e: rsa_jwk.e, + n: rsa_jwk.n, + }; + + gravity_api_types::on_chain_config::jwks::JWKStruct { + type_name: "0x1::jwks::RSA_JWK".to_string(), + data: bcs::to_bytes(&gaptos_rsa).expect("Failed to BCS serialize RSA_JWK"), + } +} diff --git a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/validator_set.rs b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/validator_set.rs index 14316a0754..457fcd99e2 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/validator_set.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/src/onchain_config/validator_set.rs @@ -2,7 +2,10 @@ use super::{ base::{ConfigFetcher, OnchainConfigFetcher}, - types::{convert_active_validators_to_bcs, getActiveValidatorsCall}, + types::{ + convert_validators_to_bcs, getActiveValidatorsCall, getPendingActiveValidatorsCall, + getPendingInactiveValidatorsCall, + }, SYSTEM_CALLER, VALIDATOR_MANAGER_ADDR, }; use alloy_eips::BlockId; @@ -11,8 +14,6 @@ use alloy_rpc_types_eth::TransactionRequest; use alloy_sol_types::SolCall; use reth_rpc_eth_api::{helpers::EthCall, RpcTypes}; -// BCS for serialization - /// Fetcher for validator set information #[derive(Debug)] pub struct ValidatorSetFetcher<'a, EthApi> { @@ -35,24 +36,65 @@ where EthApi::NetworkTypes: RpcTypes, { fn fetch(&self, block_id: BlockId) -> Option { - // Use new getActiveValidators() function - let call = getActiveValidatorsCall {}; - let input: Bytes = call.abi_encode().into(); + // 1. Fetch active validators + let active_validators = { + let call = getActiveValidatorsCall {}; + let input: Bytes = call.abi_encode().into(); + let result = self + .base_fetcher + .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) + .map_err(|e| { + tracing::warn!( + "Failed to fetch active validators at block {}: {:?}", + block_id, + e + ); + }) + .ok()?; + getActiveValidatorsCall::abi_decode_returns(&result) + .expect("Failed to decode getActiveValidators return value") + }; - let result = self - .base_fetcher - .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) - .map_err(|e| { - tracing::warn!("Failed to fetch validator set at block {}: {:?}", block_id, e); - }) - .ok()?; + // 2. Fetch pending active validators + let pending_active = { + let call = getPendingActiveValidatorsCall {}; + let input: Bytes = call.abi_encode().into(); + let result = self + .base_fetcher + .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) + .map_err(|e| { + tracing::warn!( + "Failed to fetch pending active validators at block {}: {:?}", + block_id, + e + ); + }) + .ok()?; + getPendingActiveValidatorsCall::abi_decode_returns(&result) + .expect("Failed to decode getPendingActiveValidators return value") + }; - // Decode the response as ValidatorConsensusInfo[] - let validators = getActiveValidatorsCall::abi_decode_returns(&result) - .expect("Failed to decode getActiveValidators return value"); + // 3. Fetch pending inactive validators + let pending_inactive = { + let call = getPendingInactiveValidatorsCall {}; + let input: Bytes = call.abi_encode().into(); + let result = self + .base_fetcher + .eth_call(Self::caller_address(), Self::contract_address(), input, block_id) + .map_err(|e| { + tracing::warn!( + "Failed to fetch pending inactive validators at block {}: {:?}", + block_id, + e + ); + }) + .ok()?; + getPendingInactiveValidatorsCall::abi_decode_returns(&result) + .expect("Failed to decode getPendingInactiveValidators return value") + }; - // Convert to BCS-encoded ValidatorSet format - Some(convert_active_validators_to_bcs(&validators)) + // Convert to BCS-encoded ValidatorSet format with all validator lists + Some(convert_validators_to_bcs(&active_validators, &pending_active, &pending_inactive)) } fn contract_address() -> Address { diff --git a/crates/pipe-exec-layer-ext-v2/execute/tests/gravity_genesis_test.rs b/crates/pipe-exec-layer-ext-v2/execute/tests/gravity_genesis_test.rs index 11cd853af0..f41e53853f 100644 --- a/crates/pipe-exec-layer-ext-v2/execute/tests/gravity_genesis_test.rs +++ b/crates/pipe-exec-layer-ext-v2/execute/tests/gravity_genesis_test.rs @@ -119,16 +119,7 @@ pub fn convert_validator_info( let account_address = gravity_api_types::u256_define::AccountAddress::from_bytes(&solidity_info.aptosAddress); - GravityValidatorInfo::new( - account_address, - solidity_info.votingPower.to::(), - ValidatorConfig::new( - solidity_info.consensusPublicKey.clone().into(), - solidity_info.validatorNetworkAddresses.clone().into(), - solidity_info.fullnodeNetworkAddresses.clone().into(), - solidity_info.validatorIndex.to::(), - ), - ) + todo!() } fn new_system_call_txn(contract: Address, input: Bytes) -> TxEnv { diff --git a/crates/pipe-exec-layer-ext-v2/relayer/Cargo.toml b/crates/pipe-exec-layer-ext-v2/relayer/Cargo.toml index 39ab43b014..24f334f40b 100644 --- a/crates/pipe-exec-layer-ext-v2/relayer/Cargo.toml +++ b/crates/pipe-exec-layer-ext-v2/relayer/Cargo.toml @@ -6,33 +6,32 @@ homepage.workspace = true license.workspace = true repository.workspace = true rust-version.workspace = true -description = "replayer for pipeline execution layer extension" +description = "Oracle data source relayer for Gravity chain" [lints] workspace = true [dependencies] -gravity-api-types.workspace = true -reth-primitives.workspace = true -reth-tracing.workspace = true +# Alloy primitives and types alloy-primitives.workspace = true -once_cell.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] } - -alloy-network.workspace = true -alloy-provider.workspace = true alloy-rpc-types.workspace = true +alloy-sol-macro.workspace = true +alloy-sol-types.workspace = true +alloy-provider.workspace = true alloy-transport.workspace = true +alloy-network.workspace = true + +# Gravity API types (reuse types from gaptos) +gravity-api-types.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] } +async-trait.workspace = true + +# Error handling and utilities anyhow.workspace = true -serde.workspace = true tracing.workspace = true -url.workspace = true reqwest = { workspace = true, features = ["rustls-tls"] } -alloy-sol-macro.workspace = true -alloy-sol-types.workspace = true - -serde_json.workspace = true +url.workspace = true [dev-dependencies] -alloy-sol-macro.workspace = true alloy-sol-types.workspace = true +hex = "0.4" diff --git a/crates/pipe-exec-layer-ext-v2/relayer/examples/new_usage.rs b/crates/pipe-exec-layer-ext-v2/relayer/examples/new_usage.rs deleted file mode 100644 index 4a2bd72038..0000000000 --- a/crates/pipe-exec-layer-ext-v2/relayer/examples/new_usage.rs +++ /dev/null @@ -1,96 +0,0 @@ -#![allow(missing_docs)] - -use reth_pipe_exec_layer_relayer::{RelayerManager, UriParser}; -use reth_tracing::{LayerInfo, LogFormat, RethTracer, Tracer}; -use std::time::Duration; -use tracing::{info, level_filters::LevelFilter}; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // Initialize tracing - let layer = LayerInfo::new( - LogFormat::Terminal, - LevelFilter::INFO.to_string(), - "trace".to_string(), - None, - ); - let tracer = RethTracer::new().with_stdout(layer); - - tracer.init().unwrap(); - - // 1. Create ETH client - // let rpc_url = "https://ethereum-holesky-rpc.publicnode.com"; - let rpc_url = "http://localhost:8848"; - - // 3. Create RelayerManager - let manager = RelayerManager::new(); - - // 6. Add multiple URIs - each will get its own relayer instance - let uris = vec![ - // Monitor latest block on mainnet - // "gravity://mainnet/block?strategy=head", - - // Monitor USDC Transfer events - // "gravity://mainnet/event?address=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - - "gravity://31337/event?address=0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512&topic0=0x3915136b10c16c5f181f4774902f3baf9e44a5f700cabf5c826ee1caed313624" - - // Monitor storage slot on a contract - // "gravity://mainnet/storage?account=0x123456789abcdef123456789abcdef1234567890&slot=0x0", - - // Monitor ERC20 transfers for a specific address - // "gravity://mainnet/account/0x123456789abcdef123456789abcdef1234567890/activity?type=erc20_transfer", - ]; - - // Add each URI (creates separate relayer for each) - for uri in &uris { - match manager.add_uri(uri, rpc_url, 0).await { - Ok(()) => info!("Successfully added URI: {}", uri), - Err(e) => info!("Failed to add URI {}: {}", uri, e), - } - } - - for uri in &uris { - match manager.poll_uri(uri).await { - Ok(state) => info!("Successfully polled URI: {} -> {:?}", uri, state), - Err(e) => info!("Failed to poll URI {}: {}", uri, e), - } - } - - // 8. Let it run for a while - info!("Relayers are running. Press Ctrl+C to stop..."); - tokio::time::sleep(Duration::from_secs(60)).await; - - info!("Shutdown complete!"); - Ok(()) -} - -/// Example showing how to use the new UriParser -#[allow(dead_code)] -fn parser_example() -> anyhow::Result<()> { - let parser = UriParser::new(); - - // New URI format examples - let uris = vec![ - "gravity://mainnet/block?strategy=head", - "gravity://mainnet/event?address=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "gravity://mainnet/storage?account=0x123456789abcdef123456789abcdef1234567890&slot=0x0", - "gravity://mainnet/account/0x123456789abcdef123456789abcdef1234567890/activity?type=erc20_transfer", - ]; - - for uri in uris { - match parser.parse(uri) { - Ok(task) => { - info!( - "Parsed URI: {} -> Chain: {}, Task: {:?}", - uri, task.chain_specifier, task.task - ); - } - Err(e) => { - info!("Failed to parse URI {}: {}", uri, e); - } - } - } - - Ok(()) -} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/blockchain_source.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/blockchain_source.rs new file mode 100644 index 0000000000..9c99c6cdd7 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/blockchain_source.rs @@ -0,0 +1,540 @@ +//! Blockchain Event Source +//! +//! Monitors cross-chain events from EVM chains, specifically GravityPortal.MessageSent. + +use crate::{ + data_source::{source_types, OracleData, OracleDataSource}, + eth_client::EthHttpCli, +}; +use alloy_primitives::{Address, Bytes, U256}; +use alloy_rpc_types::Filter; +use alloy_sol_macro::sol; +use alloy_sol_types::SolEvent; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; + +// GravityPortal.MessageSent event signature +sol! { + /// MessageSent(uint128 indexed nonce, bytes payload) + event MessageSent(uint128 indexed nonce, bytes payload); +} + +/// Blockchain event source for monitoring GravityPortal.MessageSent events +/// +/// This is the primary data source for cross-chain message bridging. +/// Tracks `last_returned_nonce` to ensure exactly-once consumption semantics. +#[derive(Debug)] +pub struct BlockchainEventSource { + /// Chain ID (sourceId in Oracle terms) + chain_id: u64, + + /// Ethereum RPC client + rpc_client: Arc, + + /// GravityPortal contract address + portal_address: Address, + + /// Current block cursor for polling + cursor: AtomicU64, + + /// Last nonce we returned to caller (for exactly-once tracking) + last_returned_nonce: Mutex, +} + +impl BlockchainEventSource { + /// Maximum blocks to poll in one request + const MAX_BLOCKS_PER_POLL: u64 = 100; + + /// Chunk size for backward search (stays within RPC limits) + const DISCOVERY_CHUNK_SIZE: u64 = 500; + + /// Create a new BlockchainEventSource with auto-discovery + /// + /// # Arguments + /// * `chain_id` - The EVM chain ID + /// * `rpc_url` - RPC endpoint URL + /// * `portal_address` - GravityPortal contract address + /// * `config_start_block` - Static start block from config (fallback) + /// * `latest_onchain_nonce` - The latest nonce recorded on Gravity chain + pub async fn new_with_discovery( + chain_id: u64, + rpc_url: &str, + portal_address: Address, + config_start_block: u64, + latest_onchain_nonce: u128, + ) -> Result { + let rpc_client = Arc::new(EthHttpCli::new(rpc_url)?); + + // Auto-discover start block based on latest_onchain_nonce + let start_block = if latest_onchain_nonce > 0 { + match Self::discover_start_block( + &rpc_client, + portal_address, + latest_onchain_nonce, + config_start_block, + ) + .await + { + Ok(block) => { + info!( + target: "blockchain_source", + chain_id, + latest_nonce = latest_onchain_nonce, + discovered_block = block, + "Auto-discovered start block from event log" + ); + block + } + Err(e) => { + // Fallback to config_start_block if discovery fails + warn!( + target: "blockchain_source", + chain_id, + error = ?e, + config_start_block, + "Failed to discover start block, using config start block" + ); + config_start_block + } + } + } else { + // Cold start (nonce 0): use config + info!( + target: "blockchain_source", + chain_id, + config_start_block, + "Cold start (nonce 0), using config start block" + ); + config_start_block + }; + + info!( + target: "blockchain_source", + chain_id = chain_id, + portal_address = ?portal_address, + final_start_block = start_block, + latest_onchain_nonce = latest_onchain_nonce, + "Created BlockchainEventSource" + ); + + Ok(Self { + chain_id, + rpc_client, + portal_address, + cursor: AtomicU64::new(start_block), + last_returned_nonce: Mutex::new(latest_onchain_nonce), + }) + } + + /// Discover the block number where the event with `target_nonce` occurred + /// + /// Uses chunked backward search from finalized block to config_start_block. + /// Each query is limited to DISCOVERY_CHUNK_SIZE blocks to stay within RPC limits. + async fn discover_start_block( + client: &EthHttpCli, + address: Address, + target_nonce: u128, + config_start_block: u64, + ) -> Result { + let finalized = client.get_finalized_block_number().await?; + + if finalized <= config_start_block { + return Err(anyhow!( + "Finalized block {} <= config_start_block {}", + finalized, + config_start_block + )); + } + + let nonce_topic = U256::from(target_nonce); + let mut to_block = finalized; + let mut chunks_searched = 0u32; + + // Search backwards in chunks + while to_block > config_start_block { + let from_block = + to_block.saturating_sub(Self::DISCOVERY_CHUNK_SIZE).max(config_start_block); + + let filter = Filter::new() + .address(address) + .event_signature(MessageSent::SIGNATURE_HASH) + .topic1(nonce_topic) + .from_block(from_block) + .to_block(to_block); + + match client.get_logs(&filter).await { + Ok(logs) => { + if let Some(log) = logs.first() { + if let Some(block_number) = log.block_number { + info!( + target: "blockchain_source", + target_nonce, + block_number, + chunks_searched, + "Found target nonce in event log" + ); + return Ok(block_number); + } + } + } + Err(e) => { + warn!( + target: "blockchain_source", + from_block, + to_block, + error = ?e, + "RPC get_logs failed, continuing search" + ); + } + } + + to_block = from_block; + chunks_searched += 1; + + // Log progress for long searches + if chunks_searched % 10 == 0 { + debug!( + target: "blockchain_source", + chunks_searched, + current_block = to_block, + "Still searching for nonce..." + ); + } + } + + Err(anyhow!( + "Event with nonce {} not found in range [{}, {}]", + target_nonce, + config_start_block, + finalized + )) + } + + /// Legacy new method for compatibility (will call new_with_discovery with nonce 0) + pub async fn new( + chain_id: u64, + rpc_url: &str, + portal_address: Address, + start_block: u64, + ) -> Result { + Self::new_with_discovery(chain_id, rpc_url, portal_address, start_block, 0).await + } + + /// Create from config (legacy) + pub async fn from_config(source_id: U256, config: &[u8]) -> Result { + use alloy_sol_types::SolValue; + + let decoded: (String, Address, u64) = <(String, Address, u64)>::abi_decode(config) + .map_err(|e| anyhow!("Failed to decode config: {}", e))?; + + let (rpc_url, portal_address, start_block) = decoded; + let chain_id = source_id.try_into().map_err(|_| anyhow!("Chain ID too large"))?; + + Self::new(chain_id, &rpc_url, portal_address, start_block).await + } + + /// Get the current block cursor position + pub fn cursor(&self) -> u64 { + self.cursor.load(Ordering::Relaxed) + } + + /// Get the last nonce we returned (for exactly-once tracking) + pub async fn last_nonce(&self) -> Option { + let n = *self.last_returned_nonce.lock().await; + if n > 0 { + Some(n) + } else { + None + } + } + + /// Set the last nonce (used when initializing from on-chain state) + pub async fn set_last_nonce(&self, nonce: u128) { + *self.last_returned_nonce.lock().await = nonce; + } +} + +#[async_trait] +impl OracleDataSource for BlockchainEventSource { + fn source_type(&self) -> u32 { + source_types::BLOCKCHAIN + } + + fn source_id(&self) -> U256 { + U256::from(self.chain_id) + } + + async fn poll(&self) -> Result> { + let cursor = self.cursor.load(Ordering::Relaxed); + let finalized_block = self.rpc_client.get_finalized_block_number().await?; + let to_block = std::cmp::min(cursor + Self::MAX_BLOCKS_PER_POLL, finalized_block); + + if to_block <= cursor { + return Ok(vec![]); + } + + let filter = Filter::new() + .address(self.portal_address) + .event_signature(MessageSent::SIGNATURE_HASH) + .from_block(cursor + 1) + .to_block(to_block); + + debug!( + target: "blockchain_source", + chain_id = self.chain_id, + from_block = cursor + 1, + to_block = to_block, + "Polling for MessageSent events" + ); + + let logs = self.rpc_client.get_logs(&filter).await?; + let mut results = Vec::with_capacity(logs.len()); + + // Filter events strictly greater than last_returned_nonce + let last_nonce = *self.last_returned_nonce.lock().await; + + for log in logs { + let nonce = if let Some(nonce_topic) = log.topics().get(1) { + let nonce_bytes = &nonce_topic.as_slice()[16..32]; + u128::from_be_bytes(nonce_bytes.try_into().unwrap_or_default()) + } else { + continue; + }; + + // Strictly monotonic check: ignore events we've already processed + if nonce <= last_nonce { + continue; + } + + let payload = log.data().data.clone(); + + debug!( + target: "blockchain_source", + chain_id = self.chain_id, + nonce = nonce, + payload_len = payload.len(), + "Found new MessageSent event" + ); + + results.push(OracleData { nonce, payload: Bytes::from(payload.to_vec()) }); + } + + // Update cursor + self.cursor.store(to_block, Ordering::Relaxed); + + // Track max nonce for exactly-once semantics + if !results.is_empty() { + let max_nonce = results.iter().map(|d| d.nonce).max().unwrap(); + *self.last_returned_nonce.lock().await = max_nonce; + } + + let current_last_nonce = self.last_nonce().await; + info!( + target: "blockchain_source", + chain_id = self.chain_id, + events_found = results.len(), + new_cursor = to_block, + last_nonce = ?current_last_nonce, + "Poll completed" + ); + + Ok(results) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data_source::OracleDataSource; + + // ========================================================================= + // Fixed Anvil Deployment Addresses + // ========================================================================= + // When deploying on a fresh Anvil with account 0 (0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266), + // contracts are deployed deterministically based on nonce: + // - Nonce 0: MockGToken -> 0x5FbDB2315678afecb367f032d93F642f64180aa3 + // - Nonce 1: GravityPortal -> 0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512 + // - Nonce 2: GBridgeSender -> 0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0 + // + // To run this test: + // 1. cd /home/jingyue/projects/gravity_chain_core_contracts + // 2. ./scripts/start_anvil.sh # Deploy contracts + // 3. ./scripts/bridge_test.sh # Generate MessageSent event + // 4. cargo test --package reth-pipe-exec-layer-relayer test_poll_anvil_events -- --ignored + // --nocapture + // ========================================================================= + + /// GravityPortal address on local Anvil (deterministic, nonce 1) + const ANVIL_PORTAL_ADDRESS: &str = "0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512"; + + /// GBridgeSender address on local Anvil (deterministic, nonce 2) + const ANVIL_SENDER_ADDRESS: &str = "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0"; + + /// Anvil RPC URL + const ANVIL_RPC_URL: &str = "http://localhost:8546"; + + /// Local Anvil chain ID + const ANVIL_CHAIN_ID: u64 = 31337; + + /// PortalMessage format decoder + /// Payload format: sender (20 bytes) || nonce (16 bytes) || message (variable) + fn decode_portal_message(payload: &[u8]) -> Option<(Address, u128, Vec)> { + if payload.len() < 36 { + return None; + } + + // The payload is ABI-encoded as `bytes`, so first decode the outer wrapper + // ABI encoding: offset (32 bytes) || length (32 bytes) || data + if payload.len() < 64 { + return None; + } + + // Read offset and length + let offset = u64::from_be_bytes(payload[24..32].try_into().ok()?) as usize; + if offset + 32 > payload.len() { + return None; + } + + let length = + u64::from_be_bytes(payload[offset + 24..offset + 32].try_into().ok()?) as usize; + let data_start = offset + 32; + if data_start + length > payload.len() { + return None; + } + + let inner_data = &payload[data_start..data_start + length]; + + // Now parse PortalMessage: sender (20) || nonce (16) || message + if inner_data.len() < 36 { + return None; + } + + let sender = Address::from_slice(&inner_data[0..20]); + let nonce = u128::from_be_bytes(inner_data[20..36].try_into().ok()?); + let message = inner_data[36..].to_vec(); + + Some((sender, nonce, message)) + } + + /// Decode bridge message: abi.encode(amount, recipient) + fn decode_bridge_message(message: &[u8]) -> Option<(U256, Address)> { + if message.len() < 64 { + return None; + } + + let amount = U256::from_be_slice(&message[0..32]); + let recipient = Address::from_slice(&message[44..64]); + + Some((amount, recipient)) + } + + #[tokio::test] + async fn test_poll_anvil_events() { + use crate::eth_client::EthHttpCli; + use std::sync::Arc; + + // Parse portal address + let portal_address: Address = ANVIL_PORTAL_ADDRESS.parse().expect("Invalid portal address"); + + println!("=== BlockchainEventSource Test ==="); + println!("RPC URL: {}", ANVIL_RPC_URL); + println!("Portal Address: {}", portal_address); + println!("Chain ID: {}", ANVIL_CHAIN_ID); + println!(); + + // First, debug the RPC client to check block numbers + let rpc_client = EthHttpCli::new(ANVIL_RPC_URL).expect("Failed to create RPC client"); + + // Check what finalized block returns + match rpc_client.get_finalized_block_number().await { + Ok(finalized) => println!("DEBUG: Finalized block: {}", finalized), + Err(e) => println!("DEBUG: get_finalized_block_number failed: {:?}", e), + } + + // Create source with cold start (nonce 0) + let source = BlockchainEventSource::new( + ANVIL_CHAIN_ID, + ANVIL_RPC_URL, + portal_address, + 0, // start from block 0 + ) + .await + .expect("Failed to create BlockchainEventSource"); + + println!(); + println!("Created BlockchainEventSource"); + println!(" Source Type: {}", source.source_type()); + println!(" Source ID: {}", source.source_id()); + println!(" Initial Cursor: {}", source.cursor()); + println!(); + + // Poll for events + println!("Polling for MessageSent events..."); + let events = source.poll().await.expect("Failed to poll"); + + println!("After poll - Cursor: {}", source.cursor()); + println!("Found {} events", events.len()); + println!(); + + for (i, event) in events.iter().enumerate() { + println!("=== Event {} ===", i + 1); + println!("Nonce: {}", event.nonce); + println!("Payload length: {} bytes", event.payload.len()); + println!("Raw payload: 0x{}", hex::encode(&event.payload)); + println!(); + + // Decode the PortalMessage + if let Some((sender, msg_nonce, message)) = decode_portal_message(&event.payload) { + println!("Decoded PortalMessage:"); + println!(" Sender: {}", sender); + println!(" Message Nonce: {}", msg_nonce); + println!(" Message length: {} bytes", message.len()); + println!(" Message: 0x{}", hex::encode(&message)); + + // Decode the bridge message + if let Some((amount, recipient)) = decode_bridge_message(&message) { + println!(); + println!("Decoded Bridge Message:"); + println!(" Amount: {} wei ({} G)", amount, amount / U256::from(10u64.pow(18))); + println!(" Recipient: {}", recipient); + } + } else { + println!("Failed to decode PortalMessage"); + } + println!(); + } + + // Verify we got at least one event if bridge_test.sh was run + if !events.is_empty() { + println!("✓ Successfully polled and decoded events!"); + + let first_event = &events[0]; + assert!(!first_event.payload.is_empty(), "Payload should not be empty"); + } else { + println!("No events found. Make sure to run:"); + println!(" 1. ./scripts/start_anvil.sh"); + println!(" 2. ./scripts/bridge_test.sh"); + } + } + + #[tokio::test] + #[ignore] // Requires Anvil running + async fn test_source_creation() { + let portal_address: Address = ANVIL_PORTAL_ADDRESS.parse().unwrap(); + + let source = + BlockchainEventSource::new(ANVIL_CHAIN_ID, ANVIL_RPC_URL, portal_address, 0).await; + + assert!(source.is_ok(), "Should create source successfully"); + + let source = source.unwrap(); + assert_eq!(source.source_type(), source_types::BLOCKCHAIN); + assert_eq!(source.source_id(), U256::from(ANVIL_CHAIN_ID)); + } +} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/data_source.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/data_source.rs new file mode 100644 index 0000000000..1cc9fdd709 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/data_source.rs @@ -0,0 +1,83 @@ +//! Oracle Data Source Abstraction +//! +//! This module provides the core trait and enum for all oracle data sources. +//! The extensible design allows adding new source types without modifying existing code. + +use alloy_primitives::{Bytes, U256}; +use anyhow::Result; +use async_trait::async_trait; + +use crate::blockchain_source::BlockchainEventSource; + +/// Data returned by oracle data sources +/// +/// This is the unified format for all data that flows into NativeOracle. +#[derive(Debug, Clone)] +pub struct OracleData { + /// Strictly increasing nonce for this (sourceType, sourceId) pair + /// - For Blockchain: MessageSent.nonce + pub nonce: u128, + + /// ABI-encoded payload to be stored in NativeOracle + pub payload: Bytes, +} + +/// Trait for all oracle data sources +/// +/// Each implementation represents a specific type of data source that validators +/// can poll for oracle data. +#[async_trait] +pub trait OracleDataSource: Send + Sync { + /// Get the source type (corresponds to NativeOracle.sourceType) + /// - 0: BLOCKCHAIN + fn source_type(&self) -> u32; + + /// Get the source ID (corresponds to NativeOracle.sourceId) + /// - For BLOCKCHAIN: chain ID + fn source_id(&self) -> U256; + + /// Poll for new data + /// + /// Returns a list of (nonce, payload) pairs that should be recorded in NativeOracle. + /// The implementation should track its own cursor to avoid returning duplicates. + async fn poll(&self) -> Result>; +} + +/// Source type constants (matching NativeOracle) +pub mod source_types { + /// Blockchain cross-chain events (e.g., GravityPortal.MessageSent) + pub const BLOCKCHAIN: u32 = 0; +} + +/// Extensible enum for runtime dispatch of data sources +/// +/// New source types can be added by: +/// 1. Adding a new variant here +/// 2. Implementing the source struct +/// 3. Adding a case in DataSourceFactory +#[derive(Debug)] +pub enum DataSourceKind { + /// Blockchain cross-chain events (sourceType=0) + Blockchain(BlockchainEventSource), +} + +#[async_trait] +impl OracleDataSource for DataSourceKind { + fn source_type(&self) -> u32 { + match self { + DataSourceKind::Blockchain(_) => source_types::BLOCKCHAIN, + } + } + + fn source_id(&self) -> U256 { + match self { + DataSourceKind::Blockchain(s) => s.source_id(), + } + } + + async fn poll(&self) -> Result> { + match self { + DataSourceKind::Blockchain(s) => s.poll().await, + } + } +} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/eth_client.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/eth_client.rs index dd4b9601d7..f67073389f 100644 --- a/crates/pipe-exec-layer-ext-v2/relayer/src/eth_client.rs +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/eth_client.rs @@ -1,34 +1,12 @@ use alloy_network::Ethereum; -use alloy_primitives::{Address, B256, U256}; use alloy_provider::{Provider, ProviderBuilder, RootProvider}; use alloy_rpc_types::{Filter, Log}; use anyhow::{Context as AnyhowContext, Result}; use reqwest::ClientBuilder; -use std::{sync::Arc, time::Instant}; use tokio::time::{sleep, Duration}; use tracing::{debug, warn}; use url::Url; -/// Provider performance metrics -#[derive(Debug, Default, Clone)] -pub struct ProviderMetrics { - /// Number of requests sent - pub requests_sent: u64, - /// Number of successful requests - pub requests_succeeded: u64, - /// Number of failed requests - pub requests_failed: u64, - /// Total latency time (milliseconds) - pub total_latency_ms: u64, -} -/// Ethereum transaction sender, providing reliable communication with nodes -#[derive(Clone, Debug)] -pub struct EthHttpCli { - provider: RootProvider, - metrics: Arc>, - retry_config: RetryConfig, -} - /// Retry configuration #[derive(Debug, Clone)] pub struct RetryConfig { @@ -53,6 +31,13 @@ impl Default for RetryConfig { } } +/// Ethereum HTTP client for RPC communication +#[derive(Clone, Debug)] +pub struct EthHttpCli { + provider: RootProvider, + retry_config: RetryConfig, +} + impl EthHttpCli { /// Creates a new EthHttpCli instance /// @@ -67,218 +52,44 @@ impl EthHttpCli { pub fn new(rpc_url: &str) -> Result { debug!("Creating EthHttpCli for URL: {}", rpc_url); - // Parse URL with error handling let url = Url::parse(rpc_url).with_context(|| format!("Failed to parse RPC URL: {}", rpc_url))?; - // Build HTTP client with error handling let client_builder = ClientBuilder::new().no_proxy().use_rustls_tls(); let client = client_builder.build().with_context(|| "Failed to build HTTP client")?; let provider: RootProvider = ProviderBuilder::default().connect_reqwest(client, url.clone()); - Ok(Self { - provider, - metrics: Arc::new(tokio::sync::Mutex::new(ProviderMetrics::default())), - retry_config: RetryConfig::default(), - }) - } - - /// Gets the nonce (transaction count) for a given address - /// - /// # Arguments - /// * `address` - The Ethereum address to get the nonce for - /// - /// # Returns - /// * `Result` - The nonce value or error - /// - /// # Errors - /// * Returns an error if the request times out or fails - pub async fn get_nonce(&self, address: Address) -> Result { - tokio::time::timeout(Duration::from_secs(10), async { - let nonce = self.provider.get_transaction_count(address).await?; - Ok(nonce) - }) - .await? - } - - /// Verify network connection - #[allow(unused)] - async fn verify_connection(&self) -> Result<()> { - self.get_block_number().await.map(|_| ()) - } - - /// Get account transaction count (nonce) - pub async fn get_transaction_count(&self, address: Address) -> Result { - let start = Instant::now(); - - let result = self - .retry_with_backoff(|| async { self.provider.get_transaction_count(address).await }) - .await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result - .with_context(|| format!("Failed to get transaction count for address: {:?}", address)) + Ok(Self { provider, retry_config: RetryConfig::default() }) } - /// Get account balance - pub async fn get_balance(&self, address: &Address) -> Result { - let start = Instant::now(); - - let result = - self.retry_with_backoff(|| async { self.provider.get_balance(*address).await }).await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result.with_context(|| format!("Failed to get balance for address: {:?}", address)) - } - - /// Get event logs - supports complete Filter object + /// Get event logs with the specified filter pub async fn get_logs(&self, filter: &Filter) -> Result> { - let start = Instant::now(); - - let result = - self.retry_with_backoff(|| async { self.provider.get_logs(filter).await }).await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result.with_context(|| "Failed to get logs with filter") - } - - /// Gets the storage value at a specific slot for a given address - /// - /// # Arguments - /// * `address` - The contract address to query - /// * `slot` - The storage slot to read from - /// - /// # Returns - /// * `Result` - The storage value or error - /// - /// # Errors - /// * Returns an error if the storage query fails - pub async fn get_storage_at(&self, address: Address, slot: B256) -> Result { - let start = Instant::now(); - - let result = self - .retry_with_backoff(|| async { - self.provider.get_storage_at(address, slot.into()).await - }) - .await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - result.map(|v| v.into()).with_context(|| { - format!("Failed to get storage at address: {:?}, slot: {:?}", address, slot) - }) - } - - /// Gets a block by its number - /// - /// # Arguments - /// * `block_number` - The block number to retrieve - /// - /// # Returns - /// * `Result>` - The block data or None if not found - /// - /// # Errors - /// * Returns an error if the block query fails - pub async fn get_block(&self, block_number: u64) -> Result> { - let start = Instant::now(); - - let result = self - .retry_with_backoff(|| async { - self.provider - .get_block_by_number(alloy_rpc_types::BlockNumberOrTag::Number(block_number)) - .await - }) - .await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result.with_context(|| format!("Failed to get block: {}", block_number)) + self.retry_with_backoff(|| async { self.provider.get_logs(filter).await }) + .await + .with_context(|| "Failed to get logs with filter") } /// Gets the latest finalized block number - /// - /// # Returns - /// * `Result` - The finalized block number or error - /// - /// # Errors - /// * Returns an error if no finalized block is found or the query fails pub async fn get_finalized_block_number(&self) -> Result { - let start = Instant::now(); - - let result = self - .retry_with_backoff(|| async { - match self - .provider - .get_block_by_number(alloy_rpc_types::BlockNumberOrTag::Finalized) - .await? - { - Some(block) => Ok(block.header.number), - None => Err(alloy_transport::TransportError::UnsupportedFeature( - "No finalized block found".into(), - )), - } - }) - .await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result.with_context(|| "Failed to get finalized block number") - } - - /// Gets the current gas price - /// - /// # Returns - /// * `Result` - The gas price in wei or error - /// - /// # Errors - /// * Returns an error if the gas price query fails - #[allow(unused)] - pub async fn get_gas_price(&self) -> Result { - let start = Instant::now(); - - let result = - self.retry_with_backoff(|| async { self.provider.get_gas_price().await }).await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result - .map_err(|e| anyhow::anyhow!("Failed to get gas price: {:?}", e)) - .with_context(|| "Failed to get gas price") - } - - /// Gets the latest block number - /// - /// # Returns - /// * `Result` - The latest block number or error - /// - /// # Errors - /// * Returns an error if the block number query fails - #[allow(unused)] - pub async fn get_block_number(&self) -> Result { - let start = Instant::now(); - - let result = - self.retry_with_backoff(|| async { self.provider.get_block_number().await }).await; - - self.update_metrics(result.is_ok(), start.elapsed()).await; - - result.with_context(|| "Failed to get block number") + self.retry_with_backoff(|| async { + match self + .provider + .get_block_by_number(alloy_rpc_types::BlockNumberOrTag::Finalized) + .await? + { + Some(block) => Ok(block.header.number), + None => Err(alloy_transport::TransportError::UnsupportedFeature( + "No finalized block found".into(), + )), + } + }) + .await + .with_context(|| "Failed to get finalized block number") } /// Retries an operation with exponential backoff - /// - /// # Arguments - /// * `operation` - The async operation to retry - /// - /// # Returns - /// * `Result` - The result of the operation or error after all retries - /// - /// # Errors - /// * Returns an error if all retry attempts fail async fn retry_with_backoff(&self, mut operation: F) -> Result where F: FnMut() -> Fut, @@ -323,69 +134,4 @@ impl EthHttpCli { last_error )) } - - /// Updates performance metrics with the result of an operation - /// - /// # Arguments - /// * `success` - Whether the operation was successful - /// * `latency` - The duration of the operation - async fn update_metrics(&self, success: bool, latency: Duration) { - let mut metrics = self.metrics.lock().await; - metrics.requests_sent += 1; - - if success { - metrics.requests_succeeded += 1; - } else { - metrics.requests_failed += 1; - } - - // Ensure at least 1ms latency is recorded to avoid 0 latency in very fast environments - let latency_ms = std::cmp::max(1, latency.as_millis() as u64); - metrics.total_latency_ms += latency_ms; - } - - /// Gets a copy of the current performance metrics - /// - /// # Returns - /// * `ProviderMetrics` - A copy of the current metrics - #[allow(unused)] - pub async fn get_metrics(&self) -> ProviderMetrics { - self.metrics.lock().await.clone() - } - - /// Gets the average latency in milliseconds - /// - /// # Returns - /// * `f64` - The average latency in milliseconds, or 0.0 if no requests have been made - #[allow(unused)] - pub async fn get_average_latency_ms(&self) -> f64 { - let metrics = self.metrics.lock().await; - if metrics.requests_sent > 0 { - metrics.total_latency_ms as f64 / metrics.requests_sent as f64 - } else { - 0.0 - } - } - - /// Gets the success rate as a percentage - /// - /// # Returns - /// * `f64` - The success rate as a decimal (0.0 to 1.0), or 0.0 if no requests have been made - #[allow(unused)] - pub async fn get_success_rate(&self) -> f64 { - let metrics = self.metrics.lock().await; - if metrics.requests_sent > 0 { - metrics.requests_succeeded as f64 / metrics.requests_sent as f64 - } else { - 0.0 - } - } - - /// Resets all performance metrics to their default values - #[allow(unused)] - pub async fn reset_metrics(&self) { - let mut metrics = self.metrics.lock().await; - *metrics = ProviderMetrics::default(); - debug!("TxnSender metrics reset"); - } } diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/factory.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/factory.rs new file mode 100644 index 0000000000..d91422d660 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/factory.rs @@ -0,0 +1,87 @@ +//! Data Source Factory +//! +//! Creates data sources from OracleTaskConfig.config bytes based on sourceType. + +use crate::{ + blockchain_source::BlockchainEventSource, + data_source::{source_types, DataSourceKind}, +}; +use alloy_primitives::{Bytes, U256}; +use anyhow::{anyhow, Result}; + +/// Factory for creating data sources from chain configuration +/// +/// This factory dispatches to the appropriate source implementation based on +/// the sourceType from OracleTaskConfig. +#[derive(Debug, Clone, Copy, Default)] +pub struct DataSourceFactory; + +impl DataSourceFactory { + /// Create a data source from chain configuration + /// + /// # Arguments + /// * `source_type` - The source type (0=BLOCKCHAIN, etc.) + /// * `source_id` - The source identifier (chain ID, etc.) + /// * `config` - ABI-encoded configuration from OracleTaskConfig + /// + /// # Returns + /// * `Result` - The created data source or an error + pub async fn create( + source_type: u32, + source_id: U256, + config: Bytes, + ) -> Result { + match source_type { + source_types::BLOCKCHAIN => { + let source = BlockchainEventSource::from_config(source_id, &config).await?; + Ok(DataSourceKind::Blockchain(source)) + } + _ => Err(anyhow!("Unknown source type: {}", source_type)), + } + } +} + +#[cfg(test)] +mod tests { + use crate::OracleDataSource; + + use super::*; + use alloy_primitives::address; + use alloy_sol_types::SolValue; + + #[tokio::test] + async fn test_create_blockchain_source() { + // Encode config: (rpcUrl, portalAddress, startBlock) + let config = ( + "https://rpc.example.com".to_string(), + address!("5FbDB2315678afecb367f032d93F642f64180aa3"), + 0u64, + ) + .abi_encode(); + + let result = DataSourceFactory::create( + source_types::BLOCKCHAIN, + U256::from(1), // Ethereum chain ID + Bytes::from(config), + ) + .await; + + assert!(result.is_ok()); + let source = result.unwrap(); + assert_eq!(source.source_type(), source_types::BLOCKCHAIN); + assert_eq!(source.source_id(), U256::from(1)); + } + + #[tokio::test] + async fn test_create_unknown_source_type() { + let result = DataSourceFactory::create( + 99, // Unknown type + U256::from(1), + Bytes::new(), + ) + .await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Unknown source type")); + } +} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/lib.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/lib.rs index 8c12ffa084..90e56f085c 100644 --- a/crates/pipe-exec-layer-ext-v2/relayer/src/lib.rs +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/lib.rs @@ -1,19 +1,44 @@ //! Gravity Protocol Relayer //! -//! This crate provides functionality for parsing Gravity protocol URIs and relaying blockchain -//! events. +//! This crate provides functionality for monitoring external data sources and relaying +//! oracle data to the Gravity chain. +//! +//! ## Architecture +//! +//! The architecture uses trait + enum pattern for extensibility: +//! - `OracleDataSource` trait defines the interface +//! - `DataSourceKind` enum for runtime dispatch +//! - `OracleRelayerManager` for managing sources by URI + +// ============================================================ +// Modules +// ============================================================ + +/// Core data source abstraction +pub mod data_source; + +/// Blockchain event source (GravityPortal.MessageSent) +pub mod blockchain_source; + +/// Factory for creating data sources +pub mod factory; + +/// Oracle relayer manager (URI-keyed interface) +pub mod oracle_manager; + +/// URI parser for extended gravity:// scheme +pub mod uri_parser; /// Ethereum HTTP client functionality pub mod eth_client; -/// Relayer manager for coordinating multiple relayers -pub mod manager; -/// Core relayer implementation for gravity protocol tasks -pub mod relayer; -/// URI parser for gravity protocol tasks -pub mod parser; +// ============================================================ +// Re-exports +// ============================================================ +pub use blockchain_source::BlockchainEventSource; +pub use data_source::{source_types, DataSourceKind, OracleData, OracleDataSource}; pub use eth_client::EthHttpCli; -pub use manager::{ManagerStats, RelayerManager}; -pub use parser::{AccountActivityType, GravityTask, ParsedTask, UriParser}; -pub use relayer::{GravityRelayer, ObserveState, ObservedValue, DEPOSIT_GRAVITY_EVENT_SIGNATURE}; +pub use factory::DataSourceFactory; +pub use oracle_manager::{JWKStruct, OracleRelayerManager, PollResult}; +pub use uri_parser::{parse_oracle_uri, ParsedOracleTask}; diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/manager.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/manager.rs deleted file mode 100644 index 8c14488af0..0000000000 --- a/crates/pipe-exec-layer-ext-v2/relayer/src/manager.rs +++ /dev/null @@ -1,103 +0,0 @@ -//! Relayer Manager for lifecycle management - -use crate::{parser::UriParser, relayer::GravityRelayer}; -use anyhow::{anyhow, Result}; -use gravity_api_types::relayer::PollResult; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; -use tracing::{debug, info}; - -/// Manages multiple Gravity relayers and their lifecycle -/// -/// This struct provides centralized management for multiple relayers, -/// allowing addition, removal, and polling of URIs across different RPC endpoints. -#[derive(Debug)] -pub struct RelayerManager { - uri_parser: UriParser, - relayers: Arc>>>, -} - -impl RelayerManager { - /// Creates a new RelayerManager instance - /// - /// Returns a new RelayerManager with an empty relayer map and a new URI parser. - pub fn new() -> Self { - Self { uri_parser: UriParser::new(), relayers: Arc::new(RwLock::new(HashMap::new())) } - } - - /// Adds a new URI to be monitored by the relayer - /// - /// # Arguments - /// * `uri` - The gravity protocol URI to monitor - /// * `rpc_url` - The RPC endpoint URL for the blockchain - /// * `last_state` - The last observed state to start monitoring from - /// - /// # Returns - /// * `Result<()>` - Success or error if the URI cannot be added - /// - /// # Errors - /// * Returns an error if the RPC URL is already being monitored - /// * Returns an error if the URI cannot be parsed - /// * Returns an error if the relayer cannot be created - pub async fn add_uri(&self, uri: &str, rpc_url: &str, from_block: u64) -> Result<()> { - { - let relayers = self.relayers.read().await; - if relayers.contains_key(rpc_url) { - return Err(anyhow!("RPC URL {} is already being monitored", rpc_url)); - } - } - - let task = self.uri_parser.parse(uri)?; - info!("Adding URI: {} -> {:?}", uri, task); - - let relayer = GravityRelayer::new(rpc_url, task, from_block).await?; - info!("Successfully added URI: {}, relayer: {:?}", uri, relayer); - - let mut relayers = self.relayers.write().await; - relayers.insert(uri.to_string(), Arc::new(relayer)); - Ok(()) - } - - /// Polls a specific URI for updates - /// - /// # Arguments - /// * `uri` - The URI to poll for updates - /// - /// # Returns - /// * `Result` - The poll result containing JWK structures and max block number - /// - /// # Errors - /// * Returns an error if the URI is not found in the managed relayers - pub async fn poll_uri(&self, uri: &str) -> Result { - let relayers = { self.relayers.read().await }; - let relayer = - relayers.get(uri).ok_or(anyhow!("URI {} not found, relayers: {:?}", uri, relayers))?; - let poll_result = relayer.poll_once().await?; - let jwk_struct = - GravityRelayer::convert_specific_observed_value(poll_result.observed_state.clone()) - .await?; - Ok(PollResult { - jwk_structs: jwk_struct, - max_block_number: poll_result.max_queried_block, - updated: poll_result.updated, - }) - } -} - -/// Statistics for the relayer manager -#[derive(Debug, Clone)] -pub struct ManagerStats { - /// Total number of URIs being monitored - pub total_uris: usize, - /// Number of active URIs - pub active_uris: usize, -} - -/// Implements Drop trait for graceful shutdown -impl Drop for RelayerManager { - fn drop(&mut self) { - // Note: Cannot use async code in Drop trait - // This is just for logging, actual cleanup should be done in graceful_shutdown - debug!("RelayerManager is being dropped"); - } -} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/oracle_manager.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/oracle_manager.rs new file mode 100644 index 0000000000..370fcc869f --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/oracle_manager.rs @@ -0,0 +1,164 @@ +//! Oracle Relayer Manager +//! +//! Manages oracle data sources keyed by URI, matching gaptos JWKObserver interface. + +use crate::{ + blockchain_source::BlockchainEventSource, + data_source::{source_types, DataSourceKind, OracleDataSource}, + uri_parser::{parse_oracle_uri, ParsedOracleTask}, +}; +use anyhow::{anyhow, Result}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; +use tracing::{debug, info}; + +// Re-export types from gravity-api-types for external use +pub use gravity_api_types::{on_chain_config::jwks::JWKStruct, relayer::PollResult}; + +/// Oracle Relayer Manager +/// +/// Manages data sources keyed by URI for per-observer polling. +#[derive(Debug, Default)] +pub struct OracleRelayerManager { + /// Data sources keyed by URI + sources: RwLock>>, +} + +impl OracleRelayerManager { + /// Create a new OracleRelayerManager + pub fn new() -> Self { + Self { sources: RwLock::new(HashMap::new()) } + } + + /// Add a source by URI with on-chain nonce for warm-start + /// + /// # Arguments + /// * `uri` - The oracle task URI + /// * `rpc_url` - RPC endpoint URL + /// * `latest_onchain_nonce` - Latest nonce from NativeOracle (for warm-start) + pub async fn add_uri( + &self, + uri: &str, + rpc_url: &str, + latest_onchain_nonce: u128, + ) -> Result<()> { + { + let sources = self.sources.read().await; + if sources.contains_key(uri) { + info!(target: "oracle_manager", uri = uri, "Source already exists, skipping"); + return Ok(()); + } + } + + let task = parse_oracle_uri(uri)?; + + // Create source with the provided nonce for warm-start + let source = self.create_source_from_task(&task, rpc_url, latest_onchain_nonce).await?; + + info!( + target: "oracle_manager", + uri = uri, + source_type = task.source_type, + source_id = task.source_id, + latest_onchain_nonce = latest_onchain_nonce, + "Added data source" + ); + + let mut sources = self.sources.write().await; + sources.insert(uri.to_string(), Arc::new(source)); + Ok(()) + } + + async fn create_source_from_task( + &self, + task: &ParsedOracleTask, + rpc_url: &str, + latest_onchain_nonce: u128, + ) -> Result { + match task.source_type { + source_types::BLOCKCHAIN => { + let portal_address = task.portal_address()?; + let config_start_block = task.from_block(); + + // Use new_with_discovery for robust initialization + let source = BlockchainEventSource::new_with_discovery( + task.source_id, + rpc_url, + portal_address, + config_start_block, + latest_onchain_nonce, + ) + .await?; + + Ok(DataSourceKind::Blockchain(source)) + } + _ => Err(anyhow!("Unknown source type: {}", task.source_type)), + } + } + + /// Poll a source by URI + pub async fn poll_uri(&self, uri: &str) -> Result { + let sources = self.sources.read().await; + let source = sources.get(uri).ok_or_else(|| anyhow!("Source not found: {}", uri))?; + + let data = source.poll().await?; + + let jwk_structs: Vec = data + .iter() + .map(|d| JWKStruct { + type_name: source.source_type().to_string(), + data: d.payload.to_vec(), + }) + .collect(); + + // Get nonce and cursor - use last_nonce for exactly-once semantics + let (nonce, max_block_number) = match source.as_ref() { + DataSourceKind::Blockchain(s) => (s.last_nonce().await.map(|n| n as u64), s.cursor()), + }; + + let updated = !data.is_empty(); + + debug!( + target: "oracle_manager", + uri = uri, + num_items = data.len(), + max_block = max_block_number, + nonce = ?nonce, + updated = updated, + "Poll completed" + ); + + Ok(PollResult { jwk_structs, max_block_number, nonce, updated }) + } + + /// Remove a source by URI + pub async fn remove_uri(&self, uri: &str) -> Option> { + self.sources.write().await.remove(uri) + } + + /// Get the number of registered sources + pub async fn source_count(&self) -> usize { + self.sources.read().await.len() + } + + /// Check if a source exists by URI + pub async fn has_uri(&self, uri: &str) -> bool { + self.sources.read().await.contains_key(uri) + } + + /// List all registered URIs + pub async fn list_uris(&self) -> Vec { + self.sources.read().await.keys().cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_manager_creation() { + let manager = OracleRelayerManager::new(); + assert_eq!(manager.source_count().await, 0); + } +} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/parser.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/parser.rs deleted file mode 100644 index 18ed9c44d5..0000000000 --- a/crates/pipe-exec-layer-ext-v2/relayer/src/parser.rs +++ /dev/null @@ -1,430 +0,0 @@ -//! URI parser for gravity protocol tasks - -use alloy_primitives::{Address, B256}; -use alloy_rpc_types::{BlockNumberOrTag, Filter, Topic}; -use anyhow::{anyhow, Result}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use url::Url; - -/// Defines supported task type enumeration -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum GravityTask { - /// Monitor event task, contains a Filter object that can be directly used with Alloy - MonitorEvent(Filter), - /// Monitor block head task - MonitorBlockHead, - /// Monitor storage slot task - MonitorStorage { - /// Account address to monitor storage for - account: Address, - /// Storage slot to monitor - slot: B256, - }, - /// Monitor account activity task (abstract layer) - MonitorAccount { - /// Address of the account to monitor - address: Address, - /// Type of activity to monitor for this account - activity_type: AccountActivityType, - }, -} - -/// Account activity types -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum AccountActivityType { - /// ERC20 token transfer - Erc20Transfer, - /// All transactions - AllTransactions, -} - -/// Represents a parsed gravity protocol task -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ParsedTask { - /// The parsed gravity task to be executed - pub task: GravityTask, - /// The original URI string that was parsed - pub original_uri: String, - /// The chain identifier (e.g., "mainnet", "testnet") - pub chain_specifier: String, -} - -/// URI parser for gravity protocol tasks -/// -/// This struct provides functionality to parse gravity protocol URIs -/// into structured task objects that can be executed by the relayer. -#[derive(Debug, Default)] -pub struct UriParser; - -impl UriParser { - /// Creates a new UriParser instance - /// - /// # Returns - /// * `UriParser` - A new URI parser instance - pub fn new() -> Self { - Self - } - - /// Parse gravity URI - /// - /// Supported new formats: - /// - gravity://mainnet/block?strategy=head - Monitor latest block - /// - gravity://mainnet/event?address=0x...&topic0=0x... - Monitor events - /// - gravity://mainnet/storage?account=0x...&slot=0x... - Monitor storage slot - /// - gravity://mainnet/account/0x.../activity?type=erc20_transfer - Monitor account activity - pub fn parse(&self, uri_str: &str) -> Result { - let uri = Url::parse(uri_str)?; - - if uri.scheme() != "gravity" { - return Err(anyhow!("Invalid scheme: expected 'gravity'")); - } - - let chain_specifier = - uri.host_str().ok_or_else(|| anyhow!("Missing chain specifier in URI"))?.to_string(); - - let path = uri.path(); - let params: HashMap<_, _> = uri.query_pairs().into_owned().collect(); - - let task = match path { - "/event" => self.parse_event_task(¶ms)?, - "/block" => self.parse_block_task(¶ms)?, - "/storage" => self.parse_storage_task(¶ms)?, - path if path.starts_with("/account/") => self.parse_account_task(path, ¶ms)?, - _ => return Err(anyhow!("Unsupported resource path: {}", path)), - }; - - Ok(ParsedTask { task, original_uri: uri_str.to_string(), chain_specifier }) - } - - /// Parses event monitoring task parameters - /// - /// # Arguments - /// * `params` - Query parameters containing event filter configuration - /// - /// # Returns - /// * `Result` - The parsed event monitoring task or error - /// - /// # Errors - /// * Returns an error if required parameters are missing or invalid - fn parse_event_task(&self, params: &HashMap) -> Result { - let mut filter = Filter::new(); - - if let Some(address_str) = params.get("address") { - let address: Address = address_str - .parse() - .map_err(|e| anyhow!("Invalid address '{}': {}", address_str, e))?; - filter = filter.address(address); - } - - let topics: Result, _> = (0..4) - .filter_map(|i| { - let topic_key = format!("topic{}", i); - params.get(&topic_key).map(|topic_val_str| { - topic_val_str - .split(',') - .map(|s| s.trim().parse::()) - .collect::, _>>() - .map(Topic::from) - .map_err(|e| anyhow!("Invalid topic{} value '{}': {}", i, topic_val_str, e)) - }) - }) - .collect(); - - filter = topics?.into_iter().enumerate().fold(filter, |filter, (i, topic)| match i { - 0 => filter.event_signature(topic), - 1 => filter.topic1(topic), - 2 => filter.topic2(topic), - 3 => filter.topic3(topic), - _ => filter, - }); - - // Can add more filter conditions, such as fromBlock, toBlock, etc. - if let Some(from_block_str) = params.get("fromBlock") { - if from_block_str == "latest" { - filter = filter.from_block(BlockNumberOrTag::Latest); - } else if from_block_str == "earliest" { - filter = filter.from_block(BlockNumberOrTag::Earliest); - } else if from_block_str == "finalized" { - filter = filter.from_block(BlockNumberOrTag::Finalized); - } else if let Ok(block_num) = from_block_str.parse::() { - filter = filter.from_block(BlockNumberOrTag::Number(block_num)); - } - } - - Ok(GravityTask::MonitorEvent(filter)) - } - - /// Parses block monitoring task parameters - /// - /// # Arguments - /// * `params` - Query parameters containing block monitoring strategy - /// - /// # Returns - /// * `Result` - The parsed block monitoring task or error - /// - /// # Errors - /// * Returns an error if the strategy parameter is missing or unsupported - fn parse_block_task(&self, params: &HashMap) -> Result { - match params.get("strategy").map(|s| s.as_str()) { - Some("head") => Ok(GravityTask::MonitorBlockHead), - Some(strategy) => Err(anyhow!("Unsupported block strategy: {}", strategy)), - None => Err(anyhow!("Missing 'strategy' parameter for block monitoring")), - } - } - - /// Parses storage monitoring task parameters - /// - /// # Arguments - /// * `params` - Query parameters containing account and slot information - /// - /// # Returns - /// * `Result` - The parsed storage monitoring task or error - /// - /// # Errors - /// * Returns an error if account or slot parameters are missing or invalid - fn parse_storage_task(&self, params: &HashMap) -> Result { - let account_str = params - .get("account") - .ok_or_else(|| anyhow!("Missing 'account' parameter for storage monitoring"))?; - let slot_str = params - .get("slot") - .ok_or_else(|| anyhow!("Missing 'slot' parameter for storage monitoring"))?; - - let account: Address = account_str - .parse() - .map_err(|e| anyhow!("Invalid account address '{}': {}", account_str, e))?; - let slot: B256 = - slot_str.parse().map_err(|e| anyhow!("Invalid slot value '{}': {}", slot_str, e))?; - - Ok(GravityTask::MonitorStorage { account, slot }) - } - - /// Parses account activity monitoring task parameters - /// - /// # Arguments - /// * `path` - The URI path containing account address - /// * `params` - Query parameters containing activity type - /// - /// # Returns - /// * `Result` - The parsed account monitoring task or error - /// - /// # Errors - /// * Returns an error if the path format is invalid or activity type is unsupported - fn parse_account_task( - &self, - path: &str, - params: &HashMap, - ) -> Result { - // Path format: /account/0x.../activity - let path_parts: Vec<&str> = path.split('/').collect(); - if path_parts.len() != 4 || path_parts[1] != "account" || path_parts[3] != "activity" { - return Err(anyhow!("Invalid account path format: {}", path)); - } - - let address_str = path_parts[2]; - let address: Address = address_str - .parse() - .map_err(|e| anyhow!("Invalid account address '{}': {}", address_str, e))?; - - let activity_type = match params.get("type").map(|s| s.as_str()) { - Some("erc20_transfer") => AccountActivityType::Erc20Transfer, - Some("all_transactions") => AccountActivityType::AllTransactions, - Some(activity_type) => { - return Err(anyhow!("Unsupported activity type: {}", activity_type)) - } - None => return Err(anyhow!("Missing 'type' parameter for account activity monitoring")), - }; - - Ok(GravityTask::MonitorAccount { address, activity_type }) - } - - /// Parse multiple URIs in batch - /// - /// # Arguments - /// * `uris` - A slice of URI strings to parse - /// - /// # Returns - /// * `Result>` - A vector of parsed tasks or error - /// - /// # Errors - /// * Returns an error if any URI in the batch fails to parse - pub fn parse_batch(&self, uris: &[String]) -> Result> { - let mut tasks = Vec::new(); - for uri in uris { - tasks.push(self.parse(uri)?); - } - Ok(tasks) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::str::FromStr; - - #[test] - fn test_parse_block_head_uri() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/block?strategy=head"; - - let result = parser.parse(uri).unwrap(); - - assert_eq!(result.chain_specifier, "mainnet"); - assert_eq!(result.original_uri, uri); - - match result.task { - GravityTask::MonitorBlockHead => {} - _ => panic!("Expected MonitorBlockHead task type"), - } - } - - #[test] - fn test_parse_event_uri() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/event?address=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; - - let result = parser.parse(uri).unwrap(); - - assert_eq!(result.chain_specifier, "mainnet"); - - match result.task { - GravityTask::MonitorEvent(filter) => { - println!("filter: {:?}", filter); - // Verify filter contains correct address and topic - assert!(filter.has_topics()); - } - _ => panic!("Expected MonitorEvent task type"), - } - } - - #[test] - fn test_parse_storage_uri() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/storage?account=0x123456789abcdef123456789abcdef1234567890&slot=0x0000000000000000000000000000000000000000000000000000000000000001"; - - let result = parser.parse(uri).unwrap(); - - match result.task { - GravityTask::MonitorStorage { account, slot } => { - assert_eq!( - account, - Address::from_str("0x123456789abcdef123456789abcdef1234567890").unwrap() - ); - assert_eq!( - slot, - B256::from_str( - "0x0000000000000000000000000000000000000000000000000000000000000001" - ) - .unwrap() - ); - } - _ => panic!("Expected MonitorStorage task type"), - } - } - - #[test] - fn test_parse_account_activity_uri() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/account/0x123456789abcdef123456789abcdef1234567890/activity?type=erc20_transfer"; - - let result = parser.parse(uri).unwrap(); - - match result.task { - GravityTask::MonitorAccount { address, activity_type } => { - assert_eq!( - address, - Address::from_str("0x123456789abcdef123456789abcdef1234567890").unwrap() - ); - assert_eq!(activity_type, AccountActivityType::Erc20Transfer); - } - _ => panic!("Expected MonitorAccount task type"), - } - } - - #[test] - fn test_parse_event_with_multiple_topics() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/event?address=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef&topic1=0x000000000000000000000000123456789abcdef123456789abcdef1234567890"; - - let result = parser.parse(uri).unwrap(); - - match result.task { - GravityTask::MonitorEvent(filter) => { - // Test passes if parsing is successful - println!("filter: {:?}", filter); - // Verify filter contains correct address and topic - assert!(filter.has_topics()); - } - _ => panic!("Expected MonitorEvent task type"), - } - } - - #[test] - fn test_parse_event_with_or_condition() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/event?topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef,0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; - - let result = parser.parse(uri).unwrap(); - - match result.task { - GravityTask::MonitorEvent(_filter) => { - // Test passes if parsing is successful - } - _ => panic!("Expected MonitorEvent task type"), - } - } - - #[test] - fn test_parse_invalid_scheme() { - let parser = UriParser::new(); - let uri = "http://mainnet/block?strategy=head"; - - let result = parser.parse(uri); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Invalid scheme")); - } - - #[test] - fn test_parse_missing_chain_specifier() { - let parser = UriParser::new(); - let uri = "gravity:///block?strategy=head"; - - let result = parser.parse(uri); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Missing chain specifier")); - } - - #[test] - fn test_parse_unsupported_resource() { - let parser = UriParser::new(); - let uri = "gravity://mainnet/unknown"; - - let result = parser.parse(uri); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Unsupported resource path")); - } - - #[test] - fn test_parse_batch() { - let parser = UriParser::new(); - let uris = vec![ - "gravity://mainnet/block?strategy=head".to_string(), - "gravity://mainnet/event?address=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" - .to_string(), - ]; - - let results = parser.parse_batch(&uris).unwrap(); - assert_eq!(results.len(), 2); - - match &results[0].task { - GravityTask::MonitorBlockHead => {} - _ => panic!("Expected MonitorBlockHead task type"), - } - - match &results[1].task { - GravityTask::MonitorEvent(_) => {} - _ => panic!("Expected MonitorEvent task type"), - } - } -} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/relayer.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/relayer.rs deleted file mode 100644 index cd7c133a49..0000000000 --- a/crates/pipe-exec-layer-ext-v2/relayer/src/relayer.rs +++ /dev/null @@ -1,824 +0,0 @@ -//! Relayer for gravity protocol tasks - -use crate::{ - eth_client::EthHttpCli, - parser::{AccountActivityType, GravityTask, ParsedTask}, -}; -use alloy_primitives::{Address, B256}; -use alloy_rpc_types::{BlockNumberOrTag, Filter, FilterBlockOption, Log}; -use alloy_sol_macro::sol; -use alloy_sol_types::{SolEvent, SolValue}; -use anyhow::{anyhow, Result}; -use gravity_api_types::on_chain_config::jwks::JWKStruct; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tokio::sync::Mutex; -use tracing::{debug, error, info}; - -/// DepositGravityEvent(address user, uint256 amount, address targetValidator, uint256 blockNumber); -pub const DEPOSIT_GRAVITY_EVENT_SIGNATURE: [u8; 32] = [ - 0xd5, 0x3b, 0xfb, 0x63, 0x0c, 0x04, 0x65, 0x4c, 0x6d, 0x1d, 0xa5, 0x02, 0x0f, 0x14, 0x67, 0x4f, - 0x19, 0x0f, 0x92, 0xc2, 0x57, 0xc9, 0x2d, 0x9b, 0x15, 0xd8, 0xec, 0xb4, 0x05, 0x05, 0x7c, 0x14, -]; - -/// ChangeRecord(bytes32 indexed key, bytes32 indexed value, uint256 blockNumber, address indexed -/// updater, uint256 sequenceNumber); -pub const CHANGE_RECORD_EVENT_SIGNATURE: [u8; 32] = [ - 0xf6, 0x9d, 0x80, 0xcc, 0x71, 0xff, 0xd8, 0x74, 0x04, 0x59, 0x74, 0xba, 0x04, 0x6a, 0x0b, 0xee, - 0x23, 0x1a, 0xd0, 0x5e, 0xc4, 0x59, 0x0b, 0xdd, 0xe9, 0x85, 0x75, 0xcf, 0xe0, 0x7f, 0xd7, 0x66, -]; - -sol! { - struct UnsupportedJWK { - bytes id; - bytes payload; - } - - event DepositGravityEvent( - address user, - uint256 amount, - address targetAddress, - uint256 blockNumber - ); - - event ChangeRecord( - bytes32 key, - bytes32 value, - uint256 blockNumber, - address updater, - uint256 sequenceNumber - ); -} -/// Represents the current state of observation for a gravity task -/// -/// This struct tracks the block number and observed value of the last observed state. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct ObserveState { - /// The block number at which the observation was made - pub block_number: u64, - /// The actual observed value (block, events, storage slot, or none) - pub observed_value: ObservedValue, -} - -/// Result of a polling operation, containing the observed state and the maximum block queried -#[derive(Debug, Clone)] -pub struct PollResult { - /// The observed state from this poll - pub observed_state: ObserveState, - /// The maximum block number that was actually queried in this poll - pub max_queried_block: u64, - /// Whether the observed state was updated - pub updated: bool, -} - -/// Represents different types of observed values from blockchain monitoring -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ObservedValue { - /// Observed block information - Block { - /// Hash of the observed block - block_hash: B256, - /// Number of the observed block - block_number: u64, - }, - /// Observed event logs - Events { - /// Collection of event logs that were observed - logs: Vec, - }, - /// Observed storage slot value - StorageSlot { - /// Storage slot that was observed - slot: B256, - /// Value stored in the slot - value: B256, - }, - /// No observation made - None, -} - -/// Represents different types of event data that can be processed -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum EventDataType { - /// Raw event data without specific parsing - Raw, - /// Deposit gravity event with structured data - DepositGravityEvent, - /// Change record event with structured data - ChangeRecord, -} - -/// Represents a blockchain event log with all relevant metadata -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct EventLog { - /// Contract address that emitted the event - pub address: Address, - /// Event topics (indexed parameters) - pub topics: Vec, - /// Event data (non-indexed parameters) - pub data: Vec, - /// Block number where the event occurred - pub block_number: u64, - /// Transaction hash that triggered the event - pub transaction_hash: B256, - /// Log index within the transaction - pub log_index: u64, - /// Event data type identifier for categorizing events - pub data_type: u8, -} - -impl From<&Log> for EventLog { - /// Converts an Alloy Log to an EventLog - /// - /// # Arguments - /// * `log` - The Alloy Log to convert - /// - /// # Returns - /// * `EventLog` - The converted event log - fn from(log: &Log) -> Self { - let mut event_log = Self { - address: log.address(), - topics: log.topics().to_vec(), - data: log.data().data.to_vec(), - block_number: log.block_number.unwrap_or_default(), - transaction_hash: log.transaction_hash.unwrap_or_default(), - log_index: log.log_index.unwrap_or_default(), - data_type: 0, - }; - - // Automatically determine and set the event data type - event_log.update_data_type(); - event_log - } -} - -impl EventLog { - /// Determines the event data type based on the event signature (first topic) - /// - /// # Returns - /// * `EventDataType` - The detected event data type - pub fn determine_event_data_type(&self) -> EventDataType { - // First check if we have topics (event signature) - if self.topics.is_empty() { - return EventDataType::Raw; - } - - // Get the event signature from the first topic - let event_signature = self.topics[0]; - - // Match based on event signature using cached values - if event_signature == DEPOSIT_GRAVITY_EVENT_SIGNATURE { - let stake_event = DepositGravityEvent::abi_decode_data(&self.data).unwrap(); - info!(target: "relayer stake event", - user=?stake_event.0, - amount=?stake_event.1, - target_validator=?stake_event.2, - block_number=?stake_event.3, - "relayer stake event created" - ); - EventDataType::DepositGravityEvent - } else if event_signature == CHANGE_RECORD_EVENT_SIGNATURE { - // ChangeRecord has indexed parameters (key, value, updater) in topics and - // non-indexed parameters (blockNumber, sequenceNumber) in data - let change_record_data = ChangeRecord::abi_decode_data(&self.data).unwrap(); - // Extract indexed parameters from topics (topics[1] = key, topics[2] = value, topics[3] - // = updater) - let key = if self.topics.len() > 1 { Some(self.topics[1]) } else { None }; - let value = if self.topics.len() > 2 { Some(self.topics[2]) } else { None }; - let updater = if self.topics.len() > 3 { - Some(Address::from_slice(&self.topics[3].0[12..])) - } else { - None - }; - info!(target: "relayer change record event", - key=?key, - value=?value, - block_number=?change_record_data.0, - updater=?updater, - sequence_number=?change_record_data.1, - "relayer change record event created" - ); - EventDataType::ChangeRecord - } else { - EventDataType::Raw - } - } - - /// Updates the data_type field based on the determined event type - pub fn update_data_type(&mut self) { - let event_type = self.determine_event_data_type(); - self.data_type = match event_type { - EventDataType::Raw => 0, - EventDataType::DepositGravityEvent => 1, - EventDataType::ChangeRecord => 2, - }; - } -} - -impl Into for &EventLog { - fn into(self) -> JWKStruct { - let unsupported_jwk = UnsupportedJWK { - id: self.data_type.to_string().into_bytes().into(), - payload: self.data.clone().into(), - }; - debug!(target: "relayer", "generate unsupported_jwk: {:?}", unsupported_jwk.abi_encode()); - JWKStruct { - type_name: "0x1::jwks::UnsupportedJWK".to_string(), - data: unsupported_jwk.abi_encode().into(), - } - } -} - -/// Internal state for managing a gravity task -#[derive(Debug)] -struct TaskState { - task: ParsedTask, - cursor: Mutex, - last_observed: Mutex>, -} - -impl TaskState { - /// Creates a new TaskState instance - /// - /// # Arguments - /// * `task` - The parsed task to manage - /// * `start_block` - The block number to start monitoring from - /// * `last_observed` - The last observed state - /// - /// # Returns - /// * `TaskState` - The new task state instance - fn new(task: ParsedTask, start_block: u64, last_observed: Arc) -> Self { - Self { task, cursor: Mutex::new(start_block), last_observed: Mutex::new(last_observed) } - } - - /// Gets the current cursor position - /// - /// # Returns - /// * `u64` - The current block number cursor - async fn get_cursor(&self) -> u64 { - *self.cursor.lock().await - } - - /// Updates the cursor position - /// - /// # Arguments - /// * `cursor` - The new cursor position - async fn update_cursor(&self, cursor: u64) { - *self.cursor.lock().await = cursor; - } - - /// Gets the last observed state - /// - /// # Returns - /// * `Arc` - The last observed state - async fn last_observed(&self) -> Arc { - self.last_observed.lock().await.clone() - } - - /// Checks if the observed value should trigger an update - /// - /// # Arguments - /// * `observed_value` - The newly observed value to compare - /// - /// # Returns - /// * `bool` - True if the value has changed and should trigger an update - async fn should_update(&self, observed_value: &ObservedValue) -> bool { - self.last_observed().await.observed_value != *observed_value - } - - /// Updates the last observed state - /// - /// # Arguments - /// * `last_observed` - The new observed state - async fn update_last_observed(&self, last_observed: ObserveState) { - *self.last_observed.lock().await = Arc::new(last_observed); - } -} - -/// Main relayer for gravity protocol tasks -/// -/// This struct handles the monitoring and polling of various blockchain events, -/// blocks, and storage slots based on parsed gravity tasks. -pub struct GravityRelayer { - /// Ethereum client for blockchain communication - eth_client: Arc, - /// Internal task state management - task_state: TaskState, -} - -impl std::fmt::Debug for GravityRelayer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("GravityRelayer") - .field("eth_client", &self.eth_client) - .field("task_state", &self.task_state) - .finish() - } -} - -impl GravityRelayer { - /// Maximum number of blocks to poll in one request to avoid overwhelming the system - const MAX_BLOCKS_PER_POLL: u64 = 100; - - /// Calculates the appropriate block range for polling based on cursor and finalized block - /// - /// # Arguments - /// * `cursor` - The current cursor position - /// * `finalized_block` - The latest finalized block number - /// - /// # Returns - /// * `u64` - The calculated to_block for polling - fn calculate_poll_block_range(cursor: u64, finalized_block: u64) -> u64 { - std::cmp::min(cursor + Self::MAX_BLOCKS_PER_POLL, finalized_block) - } - - /// Extracts the from_block number from a Filter if it's a numeric block number - /// - /// # Arguments - /// * `filter` - The filter to extract from_block from - /// - /// # Returns - /// * `Option` - The block number if it's a numeric value, None otherwise - fn extract_from_block_number(filter: &Filter) -> Option { - match &filter.block_option { - FilterBlockOption::Range { from_block, .. } => { - if let Some(block_num) = from_block { - match block_num { - BlockNumberOrTag::Number(n) => Some(*n), - _ => None, - } - } else { - None - } - } - _ => None, - } - } - - /// Creates a new GravityRelayer instance - /// - /// # Arguments - /// * `rpc_url` - The RPC endpoint URL for blockchain communication - /// * `task` - The parsed task to monitor - /// - /// # Returns - /// * `Result` - The new relayer instance or error - /// - /// # Errors - /// * Returns an error if unable to connect to the RPC endpoint or get finalized block - pub async fn new(rpc_url: &str, task: ParsedTask, from_block: u64) -> Result { - let eth_client = Arc::new(EthHttpCli::new(rpc_url)?); - - // Get the starting block number from the task filter or use the provided from_block - // parameter If both exist, use the maximum of the two - let start_block_number = match &task.task { - GravityTask::MonitorEvent(filter) => { - if let Some(uri_from_block) = Self::extract_from_block_number(filter) { - // Use max of URI fromBlock and parameter from_block - std::cmp::max(uri_from_block, from_block) - } else { - from_block - } - } - _ => from_block, - }; - let last_observed = - ObserveState { block_number: start_block_number, observed_value: ObservedValue::None }; - - info!(target: "relayer", - rpc_url=?rpc_url, - from_block=?start_block_number, - task=?task, - "relayer created" - ); - let task_state = TaskState::new(task.clone(), start_block_number, Arc::new(last_observed)); - Ok(Self { eth_client, task_state }) - } - - /// Polls the current task once for updates - /// - /// This method delegates to specific polling methods based on the task type. - /// - /// # Returns - /// * `Result` - The poll result containing observed state and max queried block - /// - /// # Errors - /// * Returns an error if polling fails for any reason - pub async fn poll_once(&self) -> Result { - let task_uri = &self.task_state.task.original_uri; - let state = match &self.task_state.task.task { - GravityTask::MonitorEvent(filter) => self.poll_event_task(task_uri, filter).await, - GravityTask::MonitorBlockHead => self.poll_block_head_task(task_uri).await, - GravityTask::MonitorStorage { account, slot } => { - self.poll_storage_slot_task(task_uri, *account, *slot).await - } - GravityTask::MonitorAccount { address, activity_type } => { - self.poll_account_activity_task(task_uri, *address, activity_type).await - } - }; - match state { - Ok(poll_result) => match poll_result.observed_state.observed_value { - ObservedValue::None => Err(anyhow!("Fetched none")), - _ => Ok(poll_result), - }, - Err(e) => { - error!("Error polling task {}: {}", task_uri, e); - Err(e) - } - } - } - - /// Converts an observed state into JWK structures for Gravity protocol - /// - /// # Arguments - /// * `observed_state` - The observed state to convert - /// - /// # Returns - /// * `Result>` - A vector of JWK structures or error - /// - /// # Errors - /// * Returns an error if serialization fails - pub async fn convert_specific_observed_value( - observed_state: ObserveState, - ) -> Result> { - let jwk = match observed_state.observed_value { - ObservedValue::Events { logs } => logs.iter().map(|log| log.into()).collect(), - _ => { - vec![JWKStruct { - type_name: "0".to_string(), - data: serde_json::to_vec(&observed_state).expect("failed to serialize state"), - }] - } - }; - Ok(jwk) - } - - /// Polls for event logs based on the provided filter - /// - /// # Arguments - /// * `task_uri` - The URI being monitored (for logging) - /// * `filter` - The event filter to apply - /// - /// # Returns - /// * `Result` - The poll result with observed state and max queried block - async fn poll_event_task(&self, task_uri: &str, filter: &Filter) -> Result { - let cursor = self.task_state.get_cursor().await; - let previous_value = self.task_state.last_observed().await; - - let mut scoped_filter = filter.clone(); - scoped_filter = scoped_filter.from_block(cursor); - - // Get finalized block with retry logic - let finalized_block = self.eth_client.get_finalized_block_number().await?; - - // Calculate the appropriate block range for polling - let to_block = Self::calculate_poll_block_range(cursor, finalized_block); - scoped_filter = scoped_filter.to_block(to_block); - - debug!(target: "relayer", - task_uri=?task_uri, - scoped_filter=?scoped_filter, - "polling event task" - ); - let logs = self.eth_client.get_logs(&scoped_filter).await?; - - let new_logs: Vec = logs - .iter() - .filter(|log| log.block_number.unwrap_or(0) > cursor) - .map(|log| log.into()) - .collect(); - - if new_logs.is_empty() { - // Update cursor to the to_block we actually queried - let next_cursor = to_block; - self.task_state.update_cursor(next_cursor).await; - debug!(target: "relayer", - task_uri=?task_uri, - next_cursor=?next_cursor, - "polling event task with no new logs" - ); - // Return previous value with max_queried_block - return Ok(PollResult { - observed_state: (*previous_value).clone(), - max_queried_block: to_block, - updated: false, - }); - } - - let observed_value = ObservedValue::Events { logs: new_logs.clone() }; - - let should_update = self.task_state.should_update(&observed_value).await; - - let return_value = if should_update { - // Update cursor to the to_block we actually queried, not just the max log block - let new_cursor = to_block; - self.task_state.update_cursor(new_cursor).await; - let new_value = - ObserveState { block_number: new_cursor, observed_value: observed_value.clone() }; - - self.task_state.update_last_observed(new_value.clone()).await; - PollResult { observed_state: new_value, max_queried_block: to_block, updated: true } - } else { - // Even if no update, we should still advance the cursor to avoid getting stuck - self.task_state.update_cursor(to_block).await; - PollResult { - observed_state: (*previous_value).clone(), - max_queried_block: to_block, - updated: false, - } - }; - - debug!(target: "relayer", - task_uri=?task_uri, - cursor=?cursor, - should_update=?should_update, - "polling event task completed" - ); - Ok(return_value) - } - - /// Polls for the latest block head - /// - /// # Arguments - /// * `task_uri` - The URI being monitored (for logging) - /// - /// # Returns - /// * `Result` - The poll result with observed state and max queried block - async fn poll_block_head_task(&self, task_uri: &str) -> Result { - let finalized_block = self.eth_client.get_finalized_block_number().await?; - - let cursor = self.task_state.get_cursor().await; - let previous_value = self.task_state.last_observed().await; - - // For block head polling, we only need to check the latest finalized block - let to_block = Self::calculate_poll_block_range(cursor, finalized_block); - - let return_value = if to_block > cursor { - let block_hash = match self.eth_client.get_block(to_block).await? { - Some(block) => block.header.hash, - None => B256::ZERO, - }; - - let observed_value = ObservedValue::Block { block_hash, block_number: to_block }; - - let should_update = self.task_state.should_update(&observed_value).await; - - if should_update { - self.task_state.update_cursor(to_block).await; - let new_value = - ObserveState { block_number: to_block, observed_value: observed_value.clone() }; - - self.task_state.update_last_observed(new_value.clone()).await; - PollResult { observed_state: new_value, max_queried_block: to_block, updated: true } - } else { - // Even if no update, we should still advance the cursor to avoid getting stuck - self.task_state.update_cursor(to_block).await; - PollResult { - observed_state: (*previous_value).clone(), - max_queried_block: to_block, - updated: false, - } - } - } else { - PollResult { - observed_state: (*previous_value).clone(), - max_queried_block: to_block, - updated: false, - } - }; - - debug!(target: "relayer", - task_uri=?task_uri, - cursor=?cursor, - "polling block head task completed" - ); - Ok(return_value) - } - - /// Polls for storage slot value changes - /// - /// # Arguments - /// * `task_uri` - The URI being monitored (for logging) - /// * `account` - The contract address to monitor - /// * `slot` - The storage slot to monitor - /// - /// # Returns - /// * `Result` - The poll result with observed state and max queried block - async fn poll_storage_slot_task( - &self, - task_uri: &str, - account: Address, - slot: B256, - ) -> Result { - let cursor = self.task_state.get_cursor().await; - let finalized_block = self.eth_client.get_finalized_block_number().await?; - - // For storage slot polling, we only need to check the latest finalized block - let to_block = Self::calculate_poll_block_range(cursor, finalized_block); - - let current_value = self.eth_client.get_storage_at(account, slot).await?; - let observed_value = ObservedValue::StorageSlot { slot, value: current_value }; - - let should_update = self.task_state.should_update(&observed_value).await; - let previous_value = self.task_state.last_observed().await; - - let return_value = if should_update { - self.task_state.update_cursor(to_block).await; - let new_value = - ObserveState { block_number: to_block, observed_value: observed_value.clone() }; - self.task_state.update_last_observed(new_value.clone()).await; - PollResult { observed_state: new_value, max_queried_block: to_block, updated: true } - } else { - // Even if no update, we should still advance the cursor to avoid getting stuck - self.task_state.update_cursor(to_block).await; - PollResult { - observed_state: (*previous_value).clone(), - max_queried_block: to_block, - updated: false, - } - }; - debug!(target: "relayer", - task_uri=?task_uri, - cursor=?cursor, - "polling storage slot task completed" - ); - Ok(return_value) - } - - /// Polls for account activity based on the specified activity type - /// - /// # Arguments - /// * `task_uri` - The URI being monitored (for logging) - /// * `address` - The account address to monitor - /// * `activity_type` - The type of activity to monitor - /// - /// # Returns - /// * `Result` - The poll result with observed state and max queried block - async fn poll_account_activity_task( - &self, - _task_uri: &str, - _address: Address, - _activity_type: &AccountActivityType, - ) -> Result { - // TODO: Implement account activity monitoring - unimplemented!() - } -} - -#[cfg(test)] -mod tests { - use alloy_primitives::{address, hex, Bytes, B256}; - use alloy_rpc_types::Filter; - use reth_primitives::Log; - - use crate::{ - relayer::DepositGravityEvent, EthHttpCli, GravityRelayer, ObservedValue, UriParser, - }; - use alloy_sol_macro::sol; - use alloy_sol_types::SolEvent; - - sol! { - contract USDC { - event USDCTransfer( - address indexed from, - address indexed to, - uint256 amount, - uint256 timestamp - ); - - event ChangeRecord( - bytes32 key, - bytes32 value, - uint256 blockNumber, - address updater, - uint256 sequenceNumber - ); - } - } - - #[tokio::test] - async fn test_parsed_and_run() { - let uri = std::env::var("TEST_URI") - .expect("TEST_URI environment variable must be set for this test"); - let rpc_url = std::env::var("RPC_URL") - .expect("RPC_URL environment variable must be set for this test"); - - // let uri = "gravity://31337/event?address=0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512& - // topic0=0x3915136b10c16c5f181f4774902f3baf9e44a5f700cabf5c826ee1caed313624"; - let parser = UriParser::new(); - let task = parser.parse(&uri).expect("Failed to parse test URI"); - println!("task: {:?}", task); - - let relayer = - GravityRelayer::new(&rpc_url, task, 0).await.expect("Failed to create relayer"); - - let state = relayer.poll_once().await.expect("Failed to poll relayer"); - println!("state: {:?}", state); - - match state.observed_state.observed_value { - ObservedValue::Events { logs } => { - for log in logs { - let log_obj = Log::new(log.address, log.topics, Bytes::from(log.data)) - .expect("Failed to create log object"); - let decoded = USDC::USDCTransfer::decode_log(&log_obj) - .expect("Failed to decode USDC transfer event"); - - let data = decoded.data; - let from = data.from; - let to = data.to; - let amount = data.amount; - let timestamp = data.timestamp; - println!( - "from: {:?}, to: {:?}, amount: {:?}, timestamp: {:?}", - from, to, amount, timestamp - ); - } - } - _ => {} - } - } - - #[tokio::test] - async fn test_change_record_parsed_and_run() { - // URI components - modify these values as needed - let chain_id = "31337"; - let contract_address = "0x5FbDB2315678afecb367f032d93F642f64180aa3"; - let topic0 = "0xf69d80cc71ffd874045974ba046a0bee231ad05ec4590bdde98575cfe07fd766"; // ChangeRecord event signature - let from_block = "120"; - - // Construct URI from components - let uri = format!( - "gravity://{}/event?address={}&topic0={}&fromBlock={}", - chain_id, contract_address, topic0, from_block - ); - let rpc_url = "http://localhost:8545".to_string(); - - // ChangeRecord event signature: ChangeRecord(bytes32,bytes32,uint256,address,uint256) - // topic0: 0xf69d80cc71ffd874045974ba046a0bee231ad05ec4590bdde98575cfe07fd766 - let parser = UriParser::new(); - let task = parser.parse(&uri).expect("Failed to parse test URI"); - println!("task: {:?}", task); - - let relayer = - GravityRelayer::new(&rpc_url, task, 0).await.expect("Failed to create relayer"); - - let state = relayer.poll_once().await.expect("Failed to poll relayer"); - println!("state: {:?}", state); - - match state.observed_state.observed_value { - ObservedValue::Events { logs } => { - for log in logs { - let log_obj = Log::new(log.address, log.topics, Bytes::from(log.data)) - .expect("Failed to create log object"); - let decoded = USDC::ChangeRecord::decode_log(&log_obj) - .expect("Failed to decode ChangeRecord event"); - - let data = decoded.data; - let key = data.key; - let value = data.value; - let block_number = data.blockNumber; - let updater = data.updater; - let sequence_number = data.sequenceNumber; - println!( - "key: {:?}, value: {:?}, block_number: {:?}, updater: {:?}, sequence_number: {:?}", - key, value, block_number, updater, sequence_number - ); - } - } - _ => {} - } - } - - #[tokio::test] - async fn test_direct() { - // Create mock eth client - this needs actual test implementation - let rpc_url = "https://sepolia.drpc.org".to_string(); - let eth_client = EthHttpCli::new(&rpc_url).expect("Failed to create ETH client"); - - let deposit_gravity_event_signature: [u8; 32] = [ - 0xd5, 0x3b, 0xfb, 0x63, 0x0c, 0x04, 0x65, 0x4c, 0x6d, 0x1d, 0xa5, 0x02, 0x0f, 0x14, - 0x67, 0x4f, 0x19, 0x0f, 0x92, 0xc2, 0x57, 0xc9, 0x2d, 0x9b, 0x15, 0xd8, 0xec, 0xb4, - 0x05, 0x05, 0x7c, 0x14, - ]; - let filter = Filter::new() - .address(address!("0x283fC6799867BF96bF862a05BDade3EE89132027")) - .event_signature(B256::from(deposit_gravity_event_signature)) - .from_block(9565280) - .to_block(9565290); - - let logs = eth_client.get_logs(&filter).await.expect("Failed to get logs"); - println!("logs: {:?}", logs); - - for log in logs { - let decoded = log.log_decode::().expect("Failed to decode log"); - let data = decoded.data(); - let user = data.user; - let amount = data.amount; - let target_address = data.targetAddress; - let block_number = data.blockNumber; - println!( - "user: {:?}, amount: {:?}, target_validator: {:?}, block_number: {:?}", - user, amount, target_address, block_number - ); - } - } -} diff --git a/crates/pipe-exec-layer-ext-v2/relayer/src/uri_parser.rs b/crates/pipe-exec-layer-ext-v2/relayer/src/uri_parser.rs new file mode 100644 index 0000000000..3171b3e471 --- /dev/null +++ b/crates/pipe-exec-layer-ext-v2/relayer/src/uri_parser.rs @@ -0,0 +1,136 @@ +//! Oracle URI Parser +//! +//! Parses extended gravity:// URIs that contain all oracle task configuration. +//! +//! ## URI Format +//! +//! ```text +//! gravity:////? +//! ``` +//! +//! ### Examples +//! - Blockchain events: `gravity://0/1/events?portal=0x283fC6...&fromBlock=9565280` + +use alloy_primitives::Address; +use anyhow::{anyhow, Result}; +use std::collections::HashMap; +use url::Url; + +/// Parsed oracle task from URI +#[derive(Debug, Clone)] +pub struct ParsedOracleTask { + /// Original URI string + pub uri: String, + + /// Source type (0=BLOCKCHAIN) + pub source_type: u32, + + /// Source identifier (chain ID, etc.) + pub source_id: u64, + + /// Task type ("events", etc.) + pub task_type: String, + + /// Query parameters + pub params: HashMap, +} + +impl ParsedOracleTask { + /// Get portal/contract address (for blockchain events) + /// Accepts either 'contract' or 'portal' parameter name + pub fn portal_address(&self) -> Result
{ + let addr_str = self + .params + .get("contract") + .or_else(|| self.params.get("portal")) + .ok_or_else(|| anyhow!("Missing 'contract' or 'portal' parameter in URI"))?; + + addr_str.parse::
().map_err(|e| anyhow!("Invalid contract address: {}", e)) + } + + /// Get fromBlock parameter + pub fn from_block(&self) -> u64 { + self.params.get("fromBlock").and_then(|s| s.parse().ok()).unwrap_or(0) + } + + /// Check if this is a blockchain source + pub fn is_blockchain(&self) -> bool { + self.source_type == 0 + } +} + +/// Parse a gravity:// URI into task configuration +/// +/// # Examples +/// +/// ```ignore +/// let task = parse_oracle_uri("gravity://0/1/events?portal=0x283fC6799867BF96bF862a05BDade3EE89132027&fromBlock=100")?; +/// assert_eq!(task.source_type, 0); +/// assert_eq!(task.source_id, 1); +/// assert_eq!(task.task_type, "events"); +/// ``` +pub fn parse_oracle_uri(uri: &str) -> Result { + // Validate scheme + if !uri.starts_with("gravity://") { + return Err(anyhow!("URI must start with 'gravity://'")); + } + + // Extract the part after gravity:// + let rest = &uri[10..]; // len("gravity://") = 10 + + // Find the first '/' to separate source_type from the rest + let first_slash = + rest.find('/').ok_or_else(|| anyhow!("URI must have path after source_type"))?; + + // Parse source_type (the part before first /) + let source_type: u32 = + rest[..first_slash].parse().map_err(|e| anyhow!("Invalid source_type: {}", e))?; + + // Now parse the rest using URL crate with a dummy host + // Format the URL as http://dummy/ + let rest_after_source_type = &rest[first_slash..]; // includes leading / + let url_str = format!("http://dummy{}", rest_after_source_type); + let url = Url::parse(&url_str).map_err(|e| anyhow!("Failed to parse URI: {}", e))?; + + // Parse path segments: // + let path_segments: Vec<&str> = url.path_segments().map(|s| s.collect()).unwrap_or_default(); + + if path_segments.is_empty() { + return Err(anyhow!("URI must have at least source_id in path")); + } + + // Parse source_id (first path segment) + let source_id: u64 = + path_segments[0].parse().map_err(|e| anyhow!("Invalid source_id: {}", e))?; + + // Parse task_type (second path segment, default to "events") + let task_type = path_segments.get(1).unwrap_or(&"events").to_string(); + + // Parse query parameters + let params: HashMap = url.query_pairs().into_owned().collect(); + + Ok(ParsedOracleTask { uri: uri.to_string(), source_type, source_id, task_type, params }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_blockchain_uri() { + let uri = "gravity://0/1/events?portal=0x283fC6799867BF96bF862a05BDade3EE89132027&fromBlock=9565280"; + let task = parse_oracle_uri(uri).unwrap(); + + assert_eq!(task.source_type, 0); + assert_eq!(task.source_id, 1); + assert_eq!(task.task_type, "events"); + assert_eq!(task.from_block(), 9565280); + assert!(task.portal_address().is_ok()); + } + + #[test] + fn test_invalid_scheme() { + let uri = "http://0/1/events"; + assert!(parse_oracle_uri(uri).is_err()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8183c4b925..961cfac7f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ pub use reth_node_core; pub use reth_node_ethereum; pub use reth_payload_builder; pub use reth_pipe_exec_layer_ext_v2; +pub use reth_pipe_exec_layer_relayer; pub use reth_primitives; pub use reth_provider; pub use reth_rpc;