From c08d0d6bcf79f544da3f171893620f4ecfb0dc38 Mon Sep 17 00:00:00 2001 From: Sadhbh Code Date: Wed, 8 Oct 2025 20:30:53 +0100 Subject: [PATCH] Removed locks from ColalteralBridges --- .../alloy-evm-connector/src/across_bridge.rs | 72 +++--- .../alloy-evm-connector/src/erc20_bridge.rs | 72 +++--- .../alloy-evm-connector/src/evm_connector.rs | 33 ++- examples/evm_connector/across_bridge.rs | 57 ++--- examples/evm_connector/erc20_bridge.rs | 11 +- examples/evm_connector/generic_bridge.rs | 12 +- examples/evm_connector/main.rs | 54 ++-- .../otc_custody_to_wallet_bridge.rs | 76 +++--- .../src/collateral/signer_to_wallet_bridge.rs | 54 ++-- .../src/collateral/collateral_router.rs | 239 +++++++----------- libs/symm-core/src/core/functional.rs | 22 ++ src/app/chain_connector.rs | 50 ++-- src/app/collateral_router.rs | 11 +- src/app/simple_router.rs | 99 ++++---- src/app/solver.rs | 34 +-- src/collateral/collateral_manager.rs | 35 +-- src/solver/solver.rs | 106 +++----- 17 files changed, 440 insertions(+), 597 deletions(-) diff --git a/_deprecated/alloy-evm-connector/src/across_bridge.rs b/_deprecated/alloy-evm-connector/src/across_bridge.rs index c0f4c85c..06e6de3a 100644 --- a/_deprecated/alloy-evm-connector/src/across_bridge.rs +++ b/_deprecated/alloy-evm-connector/src/across_bridge.rs @@ -4,9 +4,9 @@ use parking_lot::RwLock as AtomicLock; use rust_decimal::dec; use safe_math::safe; use std::str::FromStr; -use std::sync::{Arc, RwLock as ComponentLock, Weak}; +use std::sync::{Arc, Weak}; use symm_core::core::functional::{ - IntoObservableSingleArc, IntoObservableSingleVTable, NotificationHandlerOnce, + IntoObservableSingleArc, IntoObservableSingleVTable, IntoObservableSingleVTableRef, NotificationHandlerOnce }; use crate::chain_operations::ChainOperations; @@ -28,8 +28,8 @@ use symm_core::core::{ pub struct AcrossCollateralBridge { observer: Arc>>, - source: Arc>, - destination: Arc>, + source: Arc, + destination: Arc, // Shared chain_operations injected by EvmConnector chain_operations: Arc>, @@ -38,34 +38,32 @@ pub struct AcrossCollateralBridge { impl AcrossCollateralBridge { /// Constructor that accepts shared chain_operations from EvmConnector pub fn new_with_shared_operations( - source: Arc>, - destination: Arc>, + source: Arc, + destination: Arc, chain_operations: Arc>, - ) -> Arc> { - Arc::new({ - ComponentLock::new(Self { - observer: Arc::new(AtomicLock::new(SingleObserver::new())), - source, - destination, - chain_operations, - }) + ) -> Arc { + Arc::new(Self { + observer: Arc::new(AtomicLock::new(SingleObserver::new())), + source, + destination, + chain_operations, }) } } -impl IntoObservableSingleVTable for AcrossCollateralBridge { - fn set_observer(&mut self, observer: Box>) { +impl IntoObservableSingleVTableRef for AcrossCollateralBridge { + fn set_observer(&self, observer: Box>) { self.observer.write().set_observer(observer); } } impl CollateralBridge for AcrossCollateralBridge { - fn get_source(&self) -> Arc> { - (self.source).clone() as Arc> + fn get_source(&self) -> Arc { + (self.source).clone() as Arc } - fn get_destination(&self) -> Arc> { - (self.destination).clone() as Arc> + fn get_destination(&self) -> Arc { + (self.destination).clone() as Arc } fn transfer_funds( @@ -79,16 +77,16 @@ impl CollateralBridge for AcrossCollateralBridge { cumulative_fee: Amount, ) -> eyre::Result<()> { let observer = self.observer.clone(); - let source = self.source.read().unwrap().get_full_name(); - let destination = self.destination.read().unwrap().get_full_name(); + let source = self.source.get_full_name(); + let destination = self.destination.get_full_name(); if !(&source == "EVM:ARBITRUM:USDC" && &destination == "EVM:BASE:USDC") { return Err(eyre::eyre!("Invalid source and destination")); } // Get designation details - let source_designation = self.source.read().unwrap(); - let destination_designation = self.destination.read().unwrap(); + let source_designation = &self.source; + let destination_designation = &self.destination; // Use direct chain_operations.execute_command() instead of arbiter let command = ChainCommand::ExecuteCompleteAcrossDeposit { @@ -108,19 +106,19 @@ impl CollateralBridge for AcrossCollateralBridge { let timestamp = Utc::now(); // Callback receives the original routing amounts passed through from chain operation /*observer - .read() - .publish_single(CollateralRouterEvent::HopComplete { - chain_id, - address, - client_order_id: client_order_id.clone(), - timestamp, - source: source.clone(), - destination: destination.clone(), - route_from: route_from.clone(), - route_to: route_to.clone(), - amount: total_routed, // Now receives original_amount from chain operation - fee: fee_deducted, // Now receives original_cumulative_fee from chain operation - });*/ + .read() + .publish_single(CollateralRouterEvent::HopComplete { + chain_id, + address, + client_order_id: client_order_id.clone(), + timestamp, + source: source.clone(), + destination: destination.clone(), + route_from: route_from.clone(), + route_to: route_to.clone(), + amount: total_routed, // Now receives original_amount from chain operation + fee: fee_deducted, // Now receives original_cumulative_fee from chain operation + });*/ Ok(()) }), }; diff --git a/_deprecated/alloy-evm-connector/src/erc20_bridge.rs b/_deprecated/alloy-evm-connector/src/erc20_bridge.rs index f47c7ac6..cf150d12 100644 --- a/_deprecated/alloy-evm-connector/src/erc20_bridge.rs +++ b/_deprecated/alloy-evm-connector/src/erc20_bridge.rs @@ -2,8 +2,8 @@ use alloy::primitives::B256; use eyre::{OptionExt, Result}; use parking_lot::RwLock as AtomicLock; use safe_math::safe; -use std::sync::{Arc, RwLock as ComponentLock}; -use symm_core::core::functional::{IntoObservableSingleVTable, NotificationHandlerOnce}; +use std::sync::Arc; +use symm_core::core::functional::{IntoObservableSingleVTable, IntoObservableSingleVTableRef, NotificationHandlerOnce}; use crate::chain_operations::ChainOperations; use crate::commands::ChainCommand; @@ -22,8 +22,8 @@ use symm_core::core::{ pub struct Erc20CollateralBridge { observer: Arc>>, - source: Arc>, - destination: Arc>, + source: Arc, + destination: Arc, // Shared chain_operations injected by EvmConnector chain_operations: Arc>, @@ -32,34 +32,32 @@ pub struct Erc20CollateralBridge { impl Erc20CollateralBridge { /// Constructor that accepts shared chain_operations from EvmConnector pub fn new_with_shared_operations( - source: Arc>, - destination: Arc>, + source: Arc, + destination: Arc, chain_operations: Arc>, - ) -> Arc> { - Arc::new({ - ComponentLock::new(Self { - observer: Arc::new(AtomicLock::new(SingleObserver::new())), - source, - destination, - chain_operations, - }) + ) -> Arc { + Arc::new(Self { + observer: Arc::new(AtomicLock::new(SingleObserver::new())), + source, + destination, + chain_operations, }) } } -impl IntoObservableSingleVTable for Erc20CollateralBridge { - fn set_observer(&mut self, observer: Box>) { +impl IntoObservableSingleVTableRef for Erc20CollateralBridge { + fn set_observer(&self, observer: Box>) { self.observer.write().set_observer(observer); } } impl CollateralBridge for Erc20CollateralBridge { - fn get_source(&self) -> Arc> { - (self.source).clone() as Arc> + fn get_source(&self) -> Arc { + (self.source).clone() as Arc } - fn get_destination(&self) -> Arc> { - (self.destination).clone() as Arc> + fn get_destination(&self) -> Arc { + (self.destination).clone() as Arc } fn transfer_funds( @@ -73,12 +71,12 @@ impl CollateralBridge for Erc20CollateralBridge { cumulative_fee: Amount, ) -> Result<()> { let observer = self.observer.clone(); - let source = self.source.read().unwrap().get_full_name(); - let destination = self.destination.read().unwrap().get_full_name(); + let source = self.source.get_full_name(); + let destination = self.destination.get_full_name(); // Get designation details - let source_designation = self.source.read().unwrap(); - let destination_designation = self.destination.read().unwrap(); + let source_designation = &self.source; + let destination_designation = &self.destination; // Use direct chain_operations.execute_command() for simple ERC20 transfer let command = ChainCommand::Erc20Transfer { @@ -93,19 +91,19 @@ impl CollateralBridge for Erc20CollateralBridge { let timestamp = Utc::now(); // Callback receives the original routing amounts passed through from chain operation /*observer - .read() - .publish_single(CollateralRouterEvent::HopComplete { - chain_id, - address, - client_order_id: client_order_id.clone(), - timestamp, - source: source.clone(), - destination: destination.clone(), - route_from: route_from.clone(), - route_to: route_to.clone(), - amount: total_transferred, // Now receives original_amount from chain operation - fee: fee_deducted, // Now receives original_cumulative_fee from chain operation - });*/ + .read() + .publish_single(CollateralRouterEvent::HopComplete { + chain_id, + address, + client_order_id: client_order_id.clone(), + timestamp, + source: source.clone(), + destination: destination.clone(), + route_from: route_from.clone(), + route_to: route_to.clone(), + amount: total_transferred, // Now receives original_amount from chain operation + fee: fee_deducted, // Now receives original_cumulative_fee from chain operation + });*/ Ok(()) }), }; diff --git a/_deprecated/alloy-evm-connector/src/evm_connector.rs b/_deprecated/alloy-evm-connector/src/evm_connector.rs index 4144bd8f..a7255576 100644 --- a/_deprecated/alloy-evm-connector/src/evm_connector.rs +++ b/_deprecated/alloy-evm-connector/src/evm_connector.rs @@ -153,17 +153,14 @@ impl EvmConnector { /// Generic bridge creation method - automatically selects bridge type based on designations pub fn create_bridge( &self, - source: Arc>, - destination: Arc>, - ) -> Arc> { - let source_name = source.read().unwrap().get_name(); - let destination_name = destination.read().unwrap().get_name(); + source: Arc, + destination: Arc, + ) -> Arc { + let source_name = source.get_name(); + let destination_name = destination.get_name(); // Determine bridge type based on cross-chain check - let is_cross_chain = source - .read() - .unwrap() - .is_cross_chain(&*destination.read().unwrap()); + let is_cross_chain = source.is_cross_chain(&destination); if is_cross_chain { // Cross-chain transfer - use Across bridge @@ -179,7 +176,7 @@ impl EvmConnector { self.chain_operations.clone(), ); - bridge as Arc> + bridge as Arc } else { // Same-chain transfer - use ERC20 bridge let bridge = Erc20CollateralBridge::new_with_shared_operations( @@ -188,16 +185,16 @@ impl EvmConnector { self.chain_operations.clone(), ); - bridge as Arc> + bridge as Arc } } /// Create a new AcrossCollateralBridge with shared chain_operations (legacy method) pub fn create_across_bridge( &self, - source: Arc>, - destination: Arc>, - ) -> Arc> { + source: Arc, + destination: Arc, + ) -> Arc { AcrossCollateralBridge::new_with_shared_operations( source, destination, @@ -208,9 +205,9 @@ impl EvmConnector { /// Create a new Erc20CollateralBridge with shared chain_operations (legacy method) pub fn create_erc20_bridge( &self, - source: Arc>, - destination: Arc>, - ) -> Arc> { + source: Arc, + destination: Arc, + ) -> Arc { Erc20CollateralBridge::new_with_shared_operations( source, destination, @@ -322,7 +319,7 @@ impl ChainConnector for EvmConnector { tracing::error!("Failed to send withdraw command: {}", e); } } - + fn poll_once(&self, chain_id: u32, address: Address, symbol: Symbol) { todo!() } diff --git a/examples/evm_connector/across_bridge.rs b/examples/evm_connector/across_bridge.rs index 312245a7..16ea6ac4 100644 --- a/examples/evm_connector/across_bridge.rs +++ b/examples/evm_connector/across_bridge.rs @@ -11,7 +11,7 @@ use alloy_evm_connector::utils::{IntoAmount, IntoEvmAmount}; use index_core::collateral::collateral_router::CollateralRouterEvent; use rust_decimal::dec; use symm_core::core::bits::Amount; -use symm_core::core::functional::IntoObservableSingleFun; +use symm_core::core::functional::{IntoObservableSingleFun, IntoObservableSingleFunRef}; use tokio::sync::watch; use tracing_subscriber; @@ -135,13 +135,9 @@ async fn main() { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // Create designations with admin address - let source = Arc::new(ComponentLock::new(EvmCollateralDesignation::arbitrum_usdc( - admin_address, - ))); + let source = Arc::new(EvmCollateralDesignation::arbitrum_usdc(admin_address)); - let destination = Arc::new(ComponentLock::new(EvmCollateralDesignation::base_usdc( - admin_address, - ))); + let destination = Arc::new(EvmCollateralDesignation::base_usdc(admin_address)); // Create bridge using the generic method (it will automatically select Across bridge for cross-chain) let bridge = connector.create_bridge(source, destination); @@ -155,38 +151,33 @@ async fn main() { let (end_tx, mut end_rx) = watch::channel(false); - bridge - .write() - .unwrap() - .set_observer_fn(move |event: CollateralRouterEvent| match event { - CollateralRouterEvent::HopComplete { - chain_id: _, - address: _, - client_order_id: _, - timestamp: _, - source: _, - destination, - route_from: _, - route_to: _, + bridge.set_observer_fn(move |event: CollateralRouterEvent| match event { + CollateralRouterEvent::HopComplete { + chain_id: _, + address: _, + client_order_id: _, + timestamp: _, + source: _, + destination, + route_from: _, + route_to: _, + amount, + fee, + status, + } => { + tracing::info!( + "Bridge complete: {} USDC -> {} (fee: {})", amount, + destination, fee, - status - } => { - tracing::info!( - "Bridge complete: {} USDC -> {} (fee: {})", - amount, - destination, - fee, - ); - end_tx.send(true).expect("Failed to send ok"); - } - }); + ); + end_tx.send(true).expect("Failed to send ok"); + } + }); tracing::info!("Initiating cross-chain transfer: 10 USDC Arbitrum -> Base"); bridge - .write() - .unwrap() .transfer_funds( chain_id, admin_address, diff --git a/examples/evm_connector/erc20_bridge.rs b/examples/evm_connector/erc20_bridge.rs index bef0a93b..9d99448e 100644 --- a/examples/evm_connector/erc20_bridge.rs +++ b/examples/evm_connector/erc20_bridge.rs @@ -238,12 +238,8 @@ async fn main() { .expect("Failed to connect to Arbitrum"); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - let source_designation = Arc::new(ComponentLock::new(EvmCollateralDesignation::arbitrum_usdc( - address1, - ))); - let destination_designation = Arc::new(ComponentLock::new( - EvmCollateralDesignation::arbitrum_usdc(address2), - )); + let source_designation = Arc::new(EvmCollateralDesignation::arbitrum_usdc(address1)); + let destination_designation = Arc::new(EvmCollateralDesignation::arbitrum_usdc(address2)); let erc20_bridge = connector.create_bridge(source_designation.clone(), destination_designation.clone()); @@ -279,8 +275,7 @@ async fn main() { .unwrap(); tracing::info!(%allowance, %address1, %address2, "Allowance"); - let bridge = erc20_bridge.read().unwrap(); - bridge.transfer_funds( + erc20_bridge.transfer_funds( chain_id, from_address, client_order_id, diff --git a/examples/evm_connector/generic_bridge.rs b/examples/evm_connector/generic_bridge.rs index 28ae2924..28f9ec6f 100644 --- a/examples/evm_connector/generic_bridge.rs +++ b/examples/evm_connector/generic_bridge.rs @@ -35,13 +35,9 @@ async fn main() { tracing::info!("Test Case 1: Cross-chain transfer (ARBITRUM -> BASE)"); let wallet_address = address!("0xC0D3CB2E7452b8F4e7710bebd7529811868a85dd"); - let arbitrum_usdc = Arc::new(ComponentLock::new(EvmCollateralDesignation::arbitrum_usdc( - wallet_address, - ))); + let arbitrum_usdc = Arc::new(EvmCollateralDesignation::arbitrum_usdc(wallet_address)); - let base_usdc = Arc::new(ComponentLock::new(EvmCollateralDesignation::base_usdc( - wallet_address, - ))); + let base_usdc = Arc::new(EvmCollateralDesignation::base_usdc(wallet_address)); let cross_chain_bridge = connector.create_bridge(arbitrum_usdc.clone(), base_usdc.clone()); tracing::info!("Created bridge for cross-chain transfer"); @@ -50,9 +46,9 @@ async fn main() { tracing::info!("Test Case 2: Same-chain transfer (BASE -> BASE)"); let different_wallet_address = address!("0x833589fcd6edb6e08f4c7c32d4f71b54bda02913"); - let base_usdc_2 = Arc::new(ComponentLock::new(EvmCollateralDesignation::base_usdc( + let base_usdc_2 = Arc::new(EvmCollateralDesignation::base_usdc( different_wallet_address, // Different wallet address for demo - ))); + )); let same_chain_bridge = connector.create_bridge(base_usdc.clone(), base_usdc_2.clone()); tracing::info!("Created bridge for same-chain transfer"); diff --git a/examples/evm_connector/main.rs b/examples/evm_connector/main.rs index 5dbf5a30..085f2964 100644 --- a/examples/evm_connector/main.rs +++ b/examples/evm_connector/main.rs @@ -24,7 +24,7 @@ use symm_core::{ core::{ self, bits::{self, Amount, ClientOrderId}, - functional::{IntoObservableSingleFun, IntoObservableSingleVTable}, + functional::{IntoObservableSingleFun, IntoObservableSingleFunRef, IntoObservableSingleVTable}, logging::log_init, }, init_log, @@ -120,15 +120,15 @@ pub async fn main() { // === Create the bridge IMMEDIATELY after connector is up (bridges aren't dynamic) === // Use std::sync::RwLock for designations (as in the bridge sample) - let src_custody = Arc::new(std::sync::RwLock::new( + let src_custody = Arc::new( EvmCollateralDesignation::arbitrum_usdc_with_name(custody_a, "CUSTODY_A"), - )); - let dst_custody = Arc::new(std::sync::RwLock::new( + ); + let dst_custody = Arc::new( EvmCollateralDesignation::arbitrum_usdc_with_name(custody_b, "CUSTODY_B"), - )); + ); - let src_custody_name = src_custody.read().unwrap().get_full_name(); - let dst_custody_name = dst_custody.read().unwrap().get_full_name(); + let src_custody_name = src_custody.get_full_name(); + let dst_custody_name = dst_custody.get_full_name(); let bridge = evm_connector .write() @@ -140,27 +140,24 @@ pub async fn main() { custody_b ); - bridge - .write() - .unwrap() - .set_observer_fn(|e: CollateralRouterEvent| match e { - CollateralRouterEvent::HopComplete { - chain_id, - address, - client_order_id: _, - timestamp: _, - source, - destination, - route_from: _, - route_to: _, - amount, - fee, - status, - } => { - tracing::info!(%chain_id, %address, %amount, %source, %destination, %fee, + bridge.set_observer_fn(|e: CollateralRouterEvent| match e { + CollateralRouterEvent::HopComplete { + chain_id, + address, + client_order_id: _, + timestamp: _, + source, + destination, + route_from: _, + route_to: _, + amount, + fee, + status, + } => { + tracing::info!(%chain_id, %address, %amount, %source, %destination, %fee, "Collateral routing hop complete"); - } - }); + } + }); // Capture for event loop let bridge_for_events = bridge.clone(); @@ -198,8 +195,7 @@ pub async fn main() { } ChainNotification::Deposit { chain_id: ev_chain_id, seq_num, affiliate1, affiliate2, address, amount, timestamp } => { if address == custody_a_watch { - let guard = bridge_for_events.read().unwrap(); - match guard.transfer_funds( + match bridge_for_events.transfer_funds( chain_id, address, client_order_id.clone(), diff --git a/libs/alloy-chain-connector/src/collateral/otc_custody_to_wallet_bridge.rs b/libs/alloy-chain-connector/src/collateral/otc_custody_to_wallet_bridge.rs index 13fb1f2e..b866bf92 100644 --- a/libs/alloy-chain-connector/src/collateral/otc_custody_to_wallet_bridge.rs +++ b/libs/alloy-chain-connector/src/collateral/otc_custody_to_wallet_bridge.rs @@ -1,10 +1,13 @@ -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use chrono::Utc; -use eyre::{eyre, OptionExt}; -use index_core::collateral::{self, collateral_router::{ - self, CollateralBridge, CollateralDesignation, CollateralRouterEvent, CollateralRoutingStatus -}}; +use eyre::OptionExt; +use index_core::collateral:: + collateral_router::{ + CollateralBridge, CollateralDesignation, CollateralRouterEvent, + CollateralRoutingStatus, + } +; use parking_lot::RwLock as AtomicLock; use rust_decimal::dec; use safe_math::safe; @@ -13,7 +16,7 @@ use symm_core::core::{ bits::{Address, Amount, Symbol}, decimal_ext::DecimalExt, functional::{ - IntoObservableSingleVTable, NotificationHandlerOnce, OneShotSingleObserver, PublishSingle, SingleObserver + IntoObservableSingleVTableRef, NotificationHandlerOnce, OneShotSingleObserver, PublishSingle, SingleObserver }, }; @@ -27,15 +30,15 @@ use crate::{ pub struct OTCCustodyToWalletCollateralBridge { observer: Arc>>, - custody: Arc>, - wallet: Arc>, + custody: Arc, + wallet: Arc, gas_fee_calculator: GasFeeCalculator, } impl OTCCustodyToWalletCollateralBridge { pub fn new( - custody: Arc>, - wallet: Arc>, + custody: Arc, + wallet: Arc, gas_fee_calculator: GasFeeCalculator, ) -> Self { Self { @@ -48,12 +51,12 @@ impl OTCCustodyToWalletCollateralBridge { } impl CollateralBridge for OTCCustodyToWalletCollateralBridge { - fn get_source(&self) -> Arc> { - self.custody.clone() as Arc> + fn get_source(&self) -> Arc { + self.custody.clone() as Arc } - fn get_destination(&self) -> Arc> { - self.wallet.clone() as Arc> + fn get_destination(&self) -> Arc { + self.wallet.clone() as Arc } fn transfer_funds( @@ -66,48 +69,37 @@ impl CollateralBridge for OTCCustodyToWalletCollateralBridge { amount: Amount, cumulative_fee: Amount, ) -> eyre::Result<()> { - let (wallet_chain_id, wallet_address, wallet_token_address, wallet_name) = { - let wallet = self - .wallet - .read() - .map_err(|err| eyre!("Failed to obtain lock on wallet: {:?}", err))?; - ( - wallet.get_chain_id(), - wallet.get_address(), - wallet.get_token_address(), - wallet.get_full_name(), - ) - }; + let (wallet_chain_id, wallet_address, wallet_token_address, wallet_name) = ( + self.wallet.get_chain_id(), + self.wallet.get_address(), + self.wallet.get_token_address(), + self.wallet.get_full_name(), + ); (wallet_chain_id == chain_id) .then_some(()) .ok_or_eyre("Incorrect chain ID")?; - let custody = self - .custody - .read() - .map_err(|err| eyre!("Failed to obtain lock on custody: {:?}", err))?; - - (wallet_chain_id == custody.get_chain_id()) + (wallet_chain_id == self.custody.get_chain_id()) .then_some(()) .ok_or_eyre("Incorrect chain ID")?; - (wallet_token_address == custody.get_token_address()) + (wallet_token_address == self.custody.get_token_address()) .then_some(()) .ok_or_eyre("Incorrect token address")?; - let custody_name = custody.get_full_name(); + let custody_name = self.custody.get_full_name(); let outer_observer = self.observer.clone(); let outer_observer_clone = self.observer.clone(); let gas_fee_calculator = self.gas_fee_calculator.clone(); - + let client_order_id_clone = client_order_id.clone(); let client_order_id_clone_2 = client_order_id.clone(); let source_clone = custody_name.clone(); let destination_clone = wallet_name.clone(); let route_from_clone = route_from.clone(); let route_to_clone = route_to.clone(); - + // Charge at most 10%, we'll take the hit // TODO: Configure me let max_fee_rate = dec!(0.1); @@ -120,7 +112,8 @@ impl CollateralBridge for OTCCustodyToWalletCollateralBridge { %chain_id, %address, %client_order_id, %chargeable_fee, %gas_fee, "Computing gas fee" ); - let cumulative_fee = safe!(cumulative_fee + chargeable_fee).ok_or_eyre("Math problem")?; + let cumulative_fee = + safe!(cumulative_fee + chargeable_fee).ok_or_eyre("Math problem")?; let amount = safe!(amount - chargeable_fee).ok_or_eyre("Math problem")?; Ok((amount, cumulative_fee)) }; @@ -153,7 +146,7 @@ impl CollateralBridge for OTCCustodyToWalletCollateralBridge { route_to, amount, fee: cumulative_fee, - status: CollateralRoutingStatus::Success + status: CollateralRoutingStatus::Success, }); }); @@ -178,15 +171,14 @@ impl CollateralBridge for OTCCustodyToWalletCollateralBridge { }); }); - - custody.custody_to_address(wallet_address, amount, observer, error_observer)?; + self.custody.custody_to_address(wallet_address, amount, observer, error_observer)?; Ok(()) } } -impl IntoObservableSingleVTable for OTCCustodyToWalletCollateralBridge { - fn set_observer(&mut self, observer: Box>) { +impl IntoObservableSingleVTableRef for OTCCustodyToWalletCollateralBridge { + fn set_observer(&self, observer: Box>) { self.observer.write().set_observer(observer); } } diff --git a/libs/alloy-chain-connector/src/collateral/signer_to_wallet_bridge.rs b/libs/alloy-chain-connector/src/collateral/signer_to_wallet_bridge.rs index 7a645c0a..3ff7dd28 100644 --- a/libs/alloy-chain-connector/src/collateral/signer_to_wallet_bridge.rs +++ b/libs/alloy-chain-connector/src/collateral/signer_to_wallet_bridge.rs @@ -13,8 +13,7 @@ use symm_core::core::{ bits::{Address, Amount, Symbol}, decimal_ext::DecimalExt, functional::{ - IntoObservableSingleVTable, NotificationHandlerOnce, OneShotSingleObserver, PublishSingle, - SingleObserver, + IntoObservableSingleVTable, IntoObservableSingleVTableRef, NotificationHandlerOnce, OneShotSingleObserver, PublishSingle, SingleObserver }, }; @@ -28,15 +27,15 @@ use crate::{ pub struct SignerWalletToWalletCollateralBridge { observer: Arc>>, - signer_wallet: Arc>, - wallet: Arc>, + signer_wallet: Arc, + wallet: Arc, gas_fee_calculator: GasFeeCalculator, } impl SignerWalletToWalletCollateralBridge { pub fn new( - custody: Arc>, - wallet: Arc>, + custody: Arc, + wallet: Arc, gas_fee_calculator: GasFeeCalculator, ) -> Self { Self { @@ -49,12 +48,12 @@ impl SignerWalletToWalletCollateralBridge { } impl CollateralBridge for SignerWalletToWalletCollateralBridge { - fn get_source(&self) -> Arc> { - self.signer_wallet.clone() as Arc> + fn get_source(&self) -> Arc { + self.signer_wallet.clone() as Arc } - fn get_destination(&self) -> Arc> { - self.wallet.clone() as Arc> + fn get_destination(&self) -> Arc { + self.wallet.clone() as Arc } fn transfer_funds( @@ -67,37 +66,26 @@ impl CollateralBridge for SignerWalletToWalletCollateralBridge { amount: Amount, cumulative_fee: Amount, ) -> eyre::Result<()> { - let (wallet_chain_id, wallet_address, wallet_token_address, wallet_name) = { - let wallet = self - .wallet - .read() - .map_err(|err| eyre!("Failed to obtain lock on wallet: {:?}", err))?; - ( - wallet.get_chain_id(), - wallet.get_address(), - wallet.get_token_address(), - wallet.get_full_name(), - ) - }; + let (wallet_chain_id, wallet_address, wallet_token_address, wallet_name) = ( + self.wallet.get_chain_id(), + self.wallet.get_address(), + self.wallet.get_token_address(), + self.wallet.get_full_name(), + ); (wallet_chain_id == chain_id) .then_some(()) .ok_or_eyre("Incorrect chain ID")?; - let signer_wallet = self - .signer_wallet - .read() - .map_err(|err| eyre!("Failed to obtain lock on custody: {:?}", err))?; - - (wallet_chain_id == signer_wallet.get_chain_id()) + (wallet_chain_id == self.signer_wallet.get_chain_id()) .then_some(()) .ok_or_eyre("Incorrect chain ID")?; - (wallet_token_address == signer_wallet.get_token_address()) + (wallet_token_address == self.signer_wallet.get_token_address()) .then_some(()) .ok_or_eyre("Incorrect token address")?; - let signer_wallet_name = signer_wallet.get_full_name(); + let signer_wallet_name = self.signer_wallet.get_full_name(); let outer_observer = self.observer.clone(); let outer_observer_clone = self.observer.clone(); let gas_fee_calculator = self.gas_fee_calculator.clone(); @@ -180,14 +168,14 @@ impl CollateralBridge for SignerWalletToWalletCollateralBridge { }); }); - signer_wallet.transfer_to_account(wallet_address, amount, observer, error_observer)?; + self.signer_wallet.transfer_to_account(wallet_address, amount, observer, error_observer)?; Ok(()) } } -impl IntoObservableSingleVTable for SignerWalletToWalletCollateralBridge { - fn set_observer(&mut self, observer: Box>) { +impl IntoObservableSingleVTableRef for SignerWalletToWalletCollateralBridge { + fn set_observer(&self, observer: Box>) { self.observer.write().set_observer(observer); } } diff --git a/libs/index-core/src/collateral/collateral_router.rs b/libs/index-core/src/collateral/collateral_router.rs index fd715820..64782b82 100644 --- a/libs/index-core/src/collateral/collateral_router.rs +++ b/libs/index-core/src/collateral/collateral_router.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock as ComponentLock}, -}; +use std::{collections::HashMap, sync::Arc}; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -10,11 +7,12 @@ use eyre::{eyre, OptionExt, Result}; use derive_with_baggage::WithBaggage; use opentelemetry::propagation::Injector; +use symm_core::core::functional::IntoObservableSingleVTableRef; use symm_core::core::telemetry::{TracingData, WithBaggage}; use symm_core::core::{ bits::{Address, Amount, ClientOrderId, Side, Symbol}, - functional::{IntoObservableSingle, IntoObservableSingleVTable, PublishSingle, SingleObserver}, + functional::{IntoObservableSingle, PublishSingle, SingleObserver}, }; #[derive(Debug)] @@ -104,13 +102,13 @@ pub trait CollateralDesignation: Send + Sync { } pub trait CollateralBridge: - IntoObservableSingleVTable + Send + Sync + IntoObservableSingleVTableRef + Send + Sync { /// e.g. EVM:ARBITRUM:USDC - fn get_source(&self) -> Arc>; + fn get_source(&self) -> Arc; /// e.g. BINANCE:1:USDT - fn get_destination(&self) -> Arc>; + fn get_destination(&self) -> Arc; /// Transfer funds from this designation to target designation /// @@ -142,7 +140,7 @@ pub trait CollateralBridge: pub struct CollateralRouter { observer: SingleObserver, - bridges: HashMap<(Symbol, Symbol), Arc>>, + bridges: HashMap<(Symbol, Symbol), Arc>, routes: Vec>, chain_sources: HashMap<(u32, Symbol), Symbol>, default_destination: Option, @@ -159,24 +157,11 @@ impl CollateralRouter { } } - pub fn add_bridge(&mut self, bridge: Arc>) -> Result<()> { - let key = (|| -> Result<(Symbol, Symbol)> { - let bridge = bridge - .read() - .map_err(|e| eyre!("Failed to access bridge {}", e))?; - Ok(( - bridge - .get_source() - .read() - .map_err(|e| eyre!("Failed to access source {}", e))? - .get_full_name(), - bridge - .get_destination() - .read() - .map_err(|e| eyre!("Failed to access destination {}", e))? - .get_full_name(), - )) - })()?; + pub fn add_bridge(&mut self, bridge: Arc) -> Result<()> { + let key = ( + bridge.get_source().get_full_name(), + bridge.get_destination().get_full_name(), + ); self.bridges .insert(key, bridge) .is_none() @@ -184,7 +169,7 @@ impl CollateralRouter { .ok_or_eyre("Duplicate designation ID") } - pub fn get_bridges(&self) -> Vec>> { + pub fn get_bridges(&self) -> Vec> { self.bridges.values().cloned().collect_vec() } @@ -246,18 +231,15 @@ impl CollateralRouter { let first_hop = self.next_hop(transfer_from, transfer_from, &transfer_to)?; - first_hop - .write() - .map_err(|e| eyre!("Failed to access next hop {}", e))? - .transfer_funds( - chain_id, - address, - client_order_id, - transfer_from.clone(), - transfer_to.clone(), - amount, - Amount::ZERO, // we could charge some initial fee too! - ) + first_hop.transfer_funds( + chain_id, + address, + client_order_id, + transfer_from.clone(), + transfer_to.clone(), + amount, + Amount::ZERO, // we could charge some initial fee too! + ) } fn next_hop( @@ -265,7 +247,7 @@ impl CollateralRouter { source: &Symbol, route_from: &Symbol, route_to: &Symbol, - ) -> Result<&Arc>> { + ) -> Result<&Arc> { let route = self .routes .iter() @@ -302,13 +284,13 @@ impl CollateralRouter { Ok(next_hop) } - pub fn process_routing(&mut self, timestamp: DateTime) -> Result<()> { + pub fn process_routing(&self, timestamp: DateTime) -> Result<()> { let _ = timestamp; // TODO: Opportunity to implement retry logic for failed routes Ok(()) } - pub fn handle_collateral_router_event(&mut self, event: CollateralRouterEvent) -> Result<()> { + pub fn handle_collateral_router_event(&self, event: CollateralRouterEvent) -> Result<()> { match event { CollateralRouterEvent::HopComplete { chain_id, @@ -364,14 +346,11 @@ impl CollateralRouter { Ok(()) } else { let next_hop = self.next_hop(&destination, &route_from, &route_to)?; - let next_hop = next_hop - .write() - .map_err(|e| eyre!("Failed to access next hop {}", e))?; tracing::info!(%chain_id, %address, %client_order_id, %route_from, %route_to, %source, - next_source = %next_hop.get_source().read().map(|x| x.get_full_name()).unwrap_or_default(), - next_destination = %next_hop.get_destination().read().map(|x| x.get_full_name()).unwrap_or_default(), + next_source = %next_hop.get_source().get_full_name(), + next_destination = %next_hop.get_destination().get_full_name(), %amount, %fee, "Route Hop"); @@ -401,17 +380,17 @@ pub mod test_util { use chrono::{DateTime, Utc}; use eyre::Result; use itertools::Itertools; + use parking_lot::RwLock; use rust_decimal::dec; use std::{ collections::HashMap, - sync::{mpsc::Sender, Arc, RwLock as ComponentLock}, + sync::{mpsc::Sender, Arc}, }; use symm_core::core::{ bits::{Address, Amount, ClientOrderId, Symbol}, functional::{ - IntoObservableSingle, IntoObservableSingleVTable, NotificationHandlerOnce, - PublishSingle, SingleObserver, + IntoObservableSingleFunRef, IntoObservableSingleVTableRef, NotificationHandlerOnce, PublishSingle, SingleObserver }, test_util::{get_mock_index_name_1, get_mock_index_name_2, get_mock_index_name_3}, }; @@ -460,20 +439,20 @@ pub mod test_util { } pub struct MockCollateralBridge { - observer: SingleObserver, - pub implementor: SingleObserver, - pub source: Arc>, - pub destination: Arc>, + observer: RwLock>, + pub implementor: RwLock>, + pub source: Arc, + pub destination: Arc, } impl MockCollateralBridge { pub fn new( - source: Arc>, - destination: Arc>, + source: Arc, + destination: Arc, ) -> Self { Self { - observer: SingleObserver::new(), - implementor: SingleObserver::new(), + observer: RwLock::new(SingleObserver::new()), + implementor: RwLock::new(SingleObserver::new()), source, destination, } @@ -491,13 +470,14 @@ pub mod test_util { fee: Amount, ) { self.observer + .read() .publish_single(CollateralRouterEvent::HopComplete { chain_id, address, client_order_id, timestamp, - source: self.source.read().unwrap().get_full_name(), - destination: self.destination.read().unwrap().get_full_name(), + source: self.source.get_full_name(), + destination: self.destination.get_full_name(), route_from, route_to, amount, @@ -507,28 +487,19 @@ pub mod test_util { } } - impl IntoObservableSingle for MockCollateralBridge { - fn get_single_observer_mut(&mut self) -> &mut SingleObserver { - &mut self.observer - } - } - - impl IntoObservableSingleVTable for MockCollateralBridge { - fn set_observer( - &mut self, - observer: Box>, - ) { - self.get_single_observer_mut().set_observer(observer); + impl IntoObservableSingleVTableRef for MockCollateralBridge { + fn set_observer(&self, observer: Box>) { + self.observer.write().set_observer(observer); } } impl CollateralBridge for MockCollateralBridge { - fn get_source(&self) -> Arc> { - (self.source).clone() as Arc> + fn get_source(&self) -> Arc { + (self.source).clone() as Arc } - fn get_destination(&self) -> Arc> { - (self.destination).clone() as Arc> + fn get_destination(&self) -> Arc { + (self.destination).clone() as Arc } fn transfer_funds( @@ -541,8 +512,8 @@ pub mod test_util { amount: Amount, cumulative_fee: Amount, ) -> Result<()> { - self.implementor - .publish_single(MockCollateralBridgeInternalEvent::TransferFunds { + self.implementor.read().publish_single( + MockCollateralBridgeInternalEvent::TransferFunds { chain_id, address, client_order_id, @@ -550,101 +521,87 @@ pub mod test_util { route_to, amount, cumulative_fee, - }); + }, + ); Ok(()) } } /// Make mocked designation - pub fn make_mock_designation(full_name: &str) -> Arc> { + pub fn make_mock_designation(full_name: &str) -> Arc { let (t, n, c) = full_name.split(":").collect_tuple().unwrap(); - Arc::new(ComponentLock::new(MockCollateralDesignation { + Arc::new(MockCollateralDesignation { type_: t.to_owned().into(), name: n.to_owned().into(), collateral_symbol: c.to_owned().into(), full_name: full_name.to_owned().into(), balance: dec!(0.0), - })) + }) } /// Make mocked bridge w/o implementation pub fn make_mock_bridge( - from: &Arc>, - to: &Arc>, - ) -> Arc> { - Arc::new(ComponentLock::new(MockCollateralBridge::new( - from.clone(), - to.clone(), - ))) + from: &Arc, + to: &Arc, + ) -> Arc { + Arc::new(MockCollateralBridge::new(from.clone(), to.clone())) } /// Implement mocked bridge using fee calculation function pub fn implement_mock_bridge( tx: &Sender>, - bridge: &Arc>, - router: &Arc>, + bridge: &Arc, + router: &Arc>, calculate_fee: &Arc (Amount, Amount) + Send + Sync>, ) { // Send bridge events into router let tx_clone = tx.clone(); let router_weak = Arc::downgrade(&router); - bridge - .write() - .unwrap() - .get_single_observer_mut() - .set_observer_fn(move |e| { - let router = router_weak.upgrade().unwrap(); - tx_clone - .send(Box::new(move || { - router - .write() - .unwrap() - .handle_collateral_router_event(e) - .unwrap(); - })) - .unwrap(); - }); + bridge.set_observer_fn(move |e| { + let router = router_weak.upgrade().unwrap(); + tx_clone + .send(Box::new(move || { + router.read().handle_collateral_router_event(e).unwrap(); + })) + .unwrap(); + }); // Implement bridge let tx_clone = tx.clone(); let bridge_weak = Arc::downgrade(bridge); let calculate_fee = calculate_fee.clone(); - bridge - .write() - .unwrap() - .implementor - .set_observer_fn(move |e| { - let bridge = bridge_weak.upgrade().unwrap(); - let calculate_fee = calculate_fee.clone(); - tx_clone - .send(Box::new(move || match e { - MockCollateralBridgeInternalEvent::TransferFunds { + bridge.implementor.write().set_observer_fn(move |e| { + let bridge = bridge_weak.upgrade().unwrap(); + let calculate_fee = calculate_fee.clone(); + tx_clone + .send(Box::new(move || match e { + MockCollateralBridgeInternalEvent::TransferFunds { + chain_id, + address, + client_order_id, + route_from, + route_to, + amount, + cumulative_fee, + } => { + let (amount, cumulative_fee) = calculate_fee(amount, cumulative_fee); + bridge.notify_collateral_router_event( chain_id, address, client_order_id, + Utc::now(), route_from, route_to, amount, cumulative_fee, - } => { - let (amount, cumulative_fee) = calculate_fee(amount, cumulative_fee); - bridge.write().unwrap().notify_collateral_router_event( - chain_id, - address, - client_order_id, - Utc::now(), - route_from, - route_to, - amount, - cumulative_fee, - ); - } - })) - .unwrap(); - }); + ); + } + })) + .unwrap(); + }); // Add bridge to the router - router.write().unwrap().add_bridge(bridge.clone()).unwrap(); + router.write().add_bridge(bridge.clone()).unwrap(); } /// Create mocked router for unit-tests @@ -656,7 +613,7 @@ pub mod test_util { source_chain_map: &[(u32, &str)], default_designation: &str, calculate_fee: impl Fn(Amount, Amount) -> (Amount, Amount) + Send + Sync + 'static, - ) -> Arc> { + ) -> Arc> { let designations: HashMap = HashMap::from_iter( designations .iter() @@ -670,7 +627,7 @@ pub mod test_util { }) .collect_vec(); - let router = Arc::new(ComponentLock::new(CollateralRouter::new())); + let router = Arc::new(RwLock::new(CollateralRouter::new())); let calculate_fee: Arc (Amount, Amount) + Send + Sync + 'static> = Arc::new(calculate_fee); @@ -689,7 +646,6 @@ pub mod test_util { for symbol in &index_symbols { router .write() - .unwrap() .add_chain_source(*chain_id, symbol.clone(), source.to_owned().into()) .unwrap(); } @@ -697,7 +653,6 @@ pub mod test_util { router .write() - .unwrap() .set_default_destination(default_designation.to_owned().into()) .unwrap(); @@ -707,7 +662,7 @@ pub mod test_util { .map(|n| n.to_owned()) .map(Symbol::from) .collect_vec(); - router.write().unwrap().add_route(&route).unwrap(); + router.write().add_route(&route).unwrap(); } router @@ -774,7 +729,6 @@ mod test { router .write() - .unwrap() .get_single_observer_mut() .set_observer_fn(move |e| match e { CollateralTransferEvent::TransferComplete { @@ -815,8 +769,7 @@ mod test { // It will be coming from given Chain ID, and // it will be routed to final designation. router - .write() - .unwrap() + .read() .transfer_collateral( expected_chain_id, get_mock_address_1(), diff --git a/libs/symm-core/src/core/functional.rs b/libs/symm-core/src/core/functional.rs index a3af988c..e0763cee 100644 --- a/libs/symm-core/src/core/functional.rs +++ b/libs/symm-core/src/core/functional.rs @@ -175,6 +175,28 @@ where } } +pub trait IntoObservableSingleVTableRef: Send + Sync { + fn set_observer(&self, observer: Box>); +} + +pub trait IntoObservableSingleFunRef: Send + Sync { + fn set_observer_fn(&self, observer: impl NotificationHandlerOnce + 'static); + fn set_observer_from(&self, observer: impl IntoNotificationHandlerOnceBox); +} + +impl IntoObservableSingleFunRef for A +where + A: IntoObservableSingleVTableRef + ?Sized, +{ + fn set_observer_fn(&self, observer: impl NotificationHandlerOnce + 'static) { + self.set_observer(Box::new(observer)); + } + + fn set_observer_from(&self, observer: impl IntoNotificationHandlerOnceBox) { + self.set_observer(observer.into_notification_handler_once_box()); + } +} + /// Notifications can be handled by multiple handler, and so they must be passed /// by reference pub trait NotificationHandler: Send + Sync { diff --git a/src/app/chain_connector.rs b/src/app/chain_connector.rs index aa428848..eba54b2d 100644 --- a/src/app/chain_connector.rs +++ b/src/app/chain_connector.rs @@ -5,7 +5,6 @@ use std::{ use alloy_chain_connector::{ chain_connector::{GasFeeCalculator, RealChainConnector}, - chain_connector_sender::RealChainConnectorSender, collateral::{ otc_custody_designation::OTCCustodyCollateralDesignation, otc_custody_to_wallet_bridge::OTCCustodyToWalletCollateralBridge, @@ -206,20 +205,19 @@ impl RealChainConnectorConfigBuilder { .map(|x| x.deployment_data.index_deploy_data.collateral_token) .ok_or_eyre("Configuration contains no indexes")?; - let trading_custody = Arc::new(ComponentLock::new(WalletCollateralDesignation::new( + let trading_custody = Arc::new(WalletCollateralDesignation::new( Symbol::from("TradeRoute"), Symbol::from(trade_route.to_string()), main_quote_currency.clone(), chain_id, trade_route, collateral_token, - ))); + )); - let trading_custody_name = trading_custody.read().unwrap().get_full_name(); + let trading_custody_name = trading_custody.get_full_name(); router .write() - .unwrap() .set_default_destination(trading_custody_name.clone())?; tracing::info!("✅ Set default trading custody"); @@ -265,28 +263,26 @@ impl RealChainConnectorConfigBuilder { tracing::info!("✅ Added Index to RPC handlers"); - let index_custody = - Arc::new(ComponentLock::new(OTCCustodyCollateralDesignation::new( - sender.clone(), - Symbol::from("OTCCustody"), - Symbol::from(index_address.to_string()), - main_quote_currency.clone(), - chain_id, - custody_address, - collateral_token, - custody_id, - ))); - - let bridge_to_trading = - Arc::new(ComponentLock::new(OTCCustodyToWalletCollateralBridge::new( - index_custody.clone(), - trading_custody.clone(), - gas_fee_calculator.clone(), - ))); - - let mut router_write = router.write().unwrap(); - - let index_custody_name = index_custody.read().unwrap().get_full_name(); + let index_custody = Arc::new(OTCCustodyCollateralDesignation::new( + sender.clone(), + Symbol::from("OTCCustody"), + Symbol::from(index_address.to_string()), + main_quote_currency.clone(), + chain_id, + custody_address, + collateral_token, + custody_id, + )); + + let bridge_to_trading = Arc::new(OTCCustodyToWalletCollateralBridge::new( + index_custody.clone(), + trading_custody.clone(), + gas_fee_calculator.clone(), + )); + + let mut router_write = router.write(); + + let index_custody_name = index_custody.get_full_name(); router_write.add_bridge(bridge_to_trading)?; diff --git a/src/app/collateral_router.rs b/src/app/collateral_router.rs index 349a020c..8c5fb2c9 100644 --- a/src/app/collateral_router.rs +++ b/src/app/collateral_router.rs @@ -1,10 +1,11 @@ -use std::sync::{Arc, RwLock as ComponentLock}; +use std::sync::Arc; use super::config::ConfigBuildError; use derive_builder::Builder; use eyre::{OptionExt, Result}; use index_core::collateral::collateral_router::CollateralRouter; +use parking_lot::RwLock as AtomicLock; #[derive(Clone, Builder)] #[builder( @@ -13,7 +14,7 @@ use index_core::collateral::collateral_router::CollateralRouter; )] pub struct CollateralRouterConfig { #[builder(setter(skip))] - router: Option>>, + router: Option>>, } impl CollateralRouterConfig { @@ -22,11 +23,11 @@ impl CollateralRouterConfig { CollateralRouterConfigBuilder::default() } - pub fn expect_router_cloned(&self) -> Arc> { + pub fn expect_router_cloned(&self) -> Arc> { self.router.clone().ok_or(()).expect("Failed to get router") } - pub fn try_get_collateral_router_cloned(&self) -> Result>> { + pub fn try_get_collateral_router_cloned(&self) -> Result>> { self.router .clone() .ok_or_eyre("Failed to get collateral router") @@ -39,7 +40,7 @@ impl CollateralRouterConfigBuilder { config .router - .replace(Arc::new(ComponentLock::new(CollateralRouter::new()))); + .replace(Arc::new(AtomicLock::new(CollateralRouter::new()))); Ok(config) } diff --git a/src/app/simple_router.rs b/src/app/simple_router.rs index 7e5a8fdc..420e967e 100644 --- a/src/app/simple_router.rs +++ b/src/app/simple_router.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, RwLock as ComponentLock}; +use std::sync::Arc; use crate::app::collateral_router::CollateralRouterConfig; @@ -6,20 +6,21 @@ use super::config::ConfigBuildError; use derive_builder::Builder; use chrono::Utc; +use parking_lot::RwLock; use rust_decimal::dec; use safe_math::safe; use symm_core::core::{ bits::{Address, Amount, ClientOrderId, Symbol}, decimal_ext::DecimalExt, functional::{ - IntoObservableSingleVTable, NotificationHandlerOnce, PublishSingle, SingleObserver, + IntoObservableSingleVTableRef, NotificationHandlerOnce, PublishSingle, SingleObserver, }, }; use eyre::{OptionExt, Result}; -use index_core::collateral::{self, collateral_router::{ - self, CollateralBridge, CollateralDesignation, CollateralRouterEvent, CollateralRoutingStatus -}}; +use index_core::collateral::collateral_router::{ + CollateralBridge, CollateralDesignation, CollateralRouterEvent, CollateralRoutingStatus, +}; struct SimpleDesignation { type_: Symbol, @@ -77,35 +78,33 @@ impl CollateralDesignation for SimpleDesignation { } struct SimpleBridge { - observer: SingleObserver, - source: Arc>, - destination: Arc>, + observer: RwLock>, + source: Arc, + destination: Arc, } impl SimpleBridge { fn new(source: &str, symbol: &str, destination: &str) -> Self { Self { - observer: SingleObserver::new(), - source: Arc::new(ComponentLock::new(SimpleDesignation::new_with_symbol( - source, symbol, - ))), - destination: Arc::new(ComponentLock::new(SimpleDesignation::new(destination))), + observer: RwLock::new(SingleObserver::new()), + source: Arc::new(SimpleDesignation::new_with_symbol(source, symbol)), + destination: Arc::new(SimpleDesignation::new(destination)), } } } -impl IntoObservableSingleVTable for SimpleBridge { - fn set_observer(&mut self, observer: Box>) { - self.observer.set_observer(observer); +impl IntoObservableSingleVTableRef for SimpleBridge { + fn set_observer(&self, observer: Box>) { + self.observer.write().set_observer(observer); } } impl CollateralBridge for SimpleBridge { - fn get_source(&self) -> Arc> { + fn get_source(&self) -> Arc { self.source.clone() } - fn get_destination(&self) -> Arc> { + fn get_destination(&self) -> Arc { self.destination.clone() } @@ -124,18 +123,19 @@ impl CollateralBridge for SimpleBridge { let amount = safe!(amount - fee).ok_or_eyre("Math problem")?; let cumulative_fee = safe!(cumulative_fee + fee).ok_or_eyre("Math problem")?; self.observer + .read() .publish_single(CollateralRouterEvent::HopComplete { chain_id, address, client_order_id, timestamp: Utc::now(), - source: self.source.read().unwrap().get_full_name(), - destination: self.destination.read().unwrap().get_full_name(), + source: self.source.get_full_name(), + destination: self.destination.get_full_name(), route_from, route_to, amount, fee: cumulative_fee, - status: CollateralRoutingStatus::Success + status: CollateralRoutingStatus::Success, }); Ok(()) } @@ -179,42 +179,31 @@ impl SimpleCollateralRouterConfigBuilder { let collateral_router = config.try_get_collateral_router_cloned()?; - if let Ok(mut router) = collateral_router.write() { - let chain_id = simple_config.chain_id; - let destination = simple_config.destination; - - for symbol in simple_config.index_symbols { - let bridge = Arc::new(ComponentLock::new(SimpleBridge::new( - simple_config.source.as_str(), - &symbol, - &destination, - ))); - - // should never panic. - let source = bridge - .read() - .unwrap() - .get_source() - .read() - .unwrap() - .get_full_name() - .to_string(); - - router.add_bridge(bridge)?; - router.add_chain_source(chain_id, symbol, Symbol::from(source.as_str()))?; - router.add_route(&[ - Symbol::from(source.as_str()), - Symbol::from(destination.as_str()), - ])?; - } - - router.set_default_destination(Symbol::from(destination))?; - } else { - Err(ConfigBuildError::Other(String::from( - "Failed to obtain lock on collateral router", - )))?; + let mut router_write = collateral_router.write(); + + let chain_id = simple_config.chain_id; + let destination = simple_config.destination; + + for symbol in simple_config.index_symbols { + let bridge = Arc::new(SimpleBridge::new( + simple_config.source.as_str(), + &symbol, + &destination, + )); + + // should never panic. + let source = bridge.get_source().get_full_name().to_string(); + + router_write.add_bridge(bridge)?; + router_write.add_chain_source(chain_id, symbol, Symbol::from(source.as_str()))?; + router_write.add_route(&[ + Symbol::from(source.as_str()), + Symbol::from(destination.as_str()), + ])?; } + router_write.set_default_destination(Symbol::from(destination))?; + Ok(config) } } diff --git a/src/app/solver.rs b/src/app/solver.rs index 66846257..57c2071e 100644 --- a/src/app/solver.rs +++ b/src/app/solver.rs @@ -41,7 +41,7 @@ use symm_core::{ bits::Amount, functional::{ IntoObservableManyArc, IntoObservableManyFun, IntoObservableSingle, - IntoObservableSingleFun, + IntoObservableSingleFun, IntoObservableSingleFunRef, }, persistence::{util::JsonFilePersistence, Persist}, telemetry::{crossbeam::unbounded_traceable, TraceableEvent}, @@ -321,26 +321,10 @@ impl SolverConfig { .with_router .try_get_collateral_router_cloned()?; - let collateral_bridges = collateral_router - .read() - .map_err(|err| { - eyre!( - "Failed to obtain lock on collateral router manager: {:?}", - err - ) - })? - .get_bridges(); + let collateral_bridges = collateral_router.read().get_bridges(); for bridge in collateral_bridges { - bridge - .write() - .map_err(|err| { - eyre!( - "Failed to obtain lock on collateral bridge manager: {:?}", - err - ) - })? - .set_observer_from(router_event_tx.clone()); + bridge.set_observer_from(router_event_tx.clone()); } order_sender.write().set_observer_from(order_event_tx); @@ -369,7 +353,6 @@ impl SolverConfig { collateral_router .write() - .map_err(|err| eyre!("Failed to obtain lock on collateral router: {:?}", err))? .get_single_observer_mut() .set_observer_from(transfer_event_tx); @@ -416,13 +399,10 @@ impl SolverConfig { Ok(event) => { event.with_tracing(|notification| { tracing::trace!("Router event"); - match collateral_router.write() { - Ok(mut router) => if let Err(err) = router.handle_collateral_router_event(notification) { - tracing::warn!("Failed to handle collateral router event: {:?}", err); - } - Err(err) => { - tracing::warn!("Failed to obtain lock on collateral router: {:?}", err); - } + if let Err(err) = collateral_router + .read() + .handle_collateral_router_event(notification) { + tracing::warn!("Failed to handle collateral router event: {:?}", err); } }) } diff --git a/src/collateral/collateral_manager.rs b/src/collateral/collateral_manager.rs index 4df4f39a..2970c646 100644 --- a/src/collateral/collateral_manager.rs +++ b/src/collateral/collateral_manager.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, VecDeque}, - sync::{Arc, RwLock as ComponentLock}, + sync::Arc, }; use alloy_primitives::U256; @@ -12,6 +12,7 @@ use eyre::{eyre, OptionExt, Result}; use derive_with_baggage::WithBaggage; use opentelemetry::propagation::Injector; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use symm_core::core::{ persistence::{Persist, Persistence}, @@ -87,7 +88,7 @@ pub trait CollateralManagerHost: SetSolverOrderStatus { pub struct CollateralManager { observer: SingleObserver, - router: Arc>, + router: Arc>, persistence: Arc, client_funds: HashMap<(u32, Address), ArcSwap>, collateral_management_requests: VecDeque, @@ -96,7 +97,7 @@ pub struct CollateralManager { impl CollateralManager { pub fn new( - router: Arc>, + router: Arc>, persistence: Arc, zero_threshold: Amount, ) -> Self { @@ -118,10 +119,7 @@ impl CollateralManager { let process_collateral_span = span!(Level::TRACE, "process-collateral"); let _guard = process_collateral_span.enter(); - self.router - .write() - .map_err(|e| eyre!("Failed to access router {}", e)) - .and_then(|mut x| x.process_routing(timestamp))?; + self.router.read().process_routing(timestamp)?; let requests = VecDeque::from_iter(self.collateral_management_requests.drain(..)); @@ -182,20 +180,14 @@ impl CollateralManager { .into_iter() .filter_map(|request| { request.add_span_context_link(); - match self - .router - .write() - .map_err(|e| eyre!("Failed to access router {}", e)) - .and_then(|x| { - x.transfer_collateral( - request.chain_id, - request.address, - request.client_order_id.clone(), - request.symbol.clone(), - request.side, - request.collateral_amount, - ) - }) { + match self.router.read().transfer_collateral( + request.chain_id, + request.address, + request.client_order_id.clone(), + request.symbol.clone(), + request.side, + request.collateral_amount, + ) { Ok(()) => None, Err(err) => Some((err, request)), } @@ -847,7 +839,6 @@ mod test { let collateral_manager_weak = Arc::downgrade(&collateral_manager); router .write() - .unwrap() .get_single_observer_mut() .set_observer_fn(move |e| { let collateral_manager = collateral_manager_weak.upgrade().unwrap(); diff --git a/src/solver/solver.rs b/src/solver/solver.rs index 5ab0ba0a..96f69fe1 100644 --- a/src/solver/solver.rs +++ b/src/solver/solver.rs @@ -322,7 +322,8 @@ impl Solver { /// Initiate graceful shutdown by setting status to ShuttingDown pub fn initiate_shutdown(&self) { - self.status.store(SolverStatus::ShuttingDown.into(), Ordering::SeqCst); + self.status + .store(SolverStatus::ShuttingDown.into(), Ordering::SeqCst); tracing::info!("Solver shutdown initiated - status set to ShuttingDown"); } @@ -1954,7 +1955,7 @@ mod test { assert_decimal_approx_eq, core::{ bits::{PricePointEntry, SingleOrder}, - functional::{IntoObservableMany, IntoObservableSingle}, + functional::{IntoObservableMany, IntoObservableSingle, IntoObservableSingleFunRef}, logging::log_init, persistence::util::InMemoryPersistence, test_util::{ @@ -2112,99 +2113,72 @@ mod test { let chain_connector = Arc::new(ComponentLock::new(MockChainConnector::new())); let fix_server = Arc::new(RwLock::new(MockServer::new())); - let collateral_designation_1 = Arc::new(ComponentLock::new(MockCollateralDesignation { + let collateral_designation_1 = Arc::new(MockCollateralDesignation { type_: "T1".into(), name: "D1".into(), collateral_symbol: "C1".into(), full_name: "T1:D1:C1".into(), balance: dec!(0.0), - })); + }); - let collateral_designation_2 = Arc::new(ComponentLock::new(MockCollateralDesignation { + let collateral_designation_2 = Arc::new(MockCollateralDesignation { type_: "T2".into(), name: "D2".into(), collateral_symbol: "C2".into(), full_name: "T2:D2:C2".into(), balance: dec!(0.0), - })); + }); - let collateral_designation_3 = Arc::new(ComponentLock::new(MockCollateralDesignation { + let collateral_designation_3 = Arc::new(MockCollateralDesignation { type_: "T3".into(), name: "D3".into(), collateral_symbol: "C3".into(), full_name: "T3:D3:C3".into(), balance: dec!(0.0), - })); + }); - let collateral_bridge_1 = Arc::new(ComponentLock::new(MockCollateralBridge::new( + let collateral_bridge_1 = Arc::new(MockCollateralBridge::new( collateral_designation_1.clone(), collateral_designation_2.clone(), - ))); + )); - let collateral_bridge_2 = Arc::new(ComponentLock::new(MockCollateralBridge::new( + let collateral_bridge_2 = Arc::new(MockCollateralBridge::new( collateral_designation_2.clone(), collateral_designation_3.clone(), - ))); + )); - let collateral_router = Arc::new(ComponentLock::new(CollateralRouter::new())); + let collateral_router = Arc::new(RwLock::new(CollateralRouter::new())); collateral_router .write() - .unwrap() .add_bridge(collateral_bridge_1.clone()) .expect("Failed to add bridge"); collateral_router .write() - .unwrap() .add_bridge(collateral_bridge_2.clone()) .expect("Failed to add bridge"); collateral_router .write() - .unwrap() .add_chain_source( chain_id, get_mock_index_name_1(), - collateral_designation_1 - .read() - .unwrap() - .get_full_name() - .clone(), + collateral_designation_1.get_full_name().clone(), ) .expect("Failed to add chain source"); collateral_router .write() - .unwrap() - .set_default_destination( - collateral_designation_3 - .read() - .unwrap() - .get_full_name() - .clone(), - ) + .set_default_destination(collateral_designation_3.get_full_name().clone()) .expect("Failed to set default destination"); collateral_router .write() - .unwrap() .add_route(&[ - collateral_designation_1 - .read() - .unwrap() - .get_full_name() - .clone(), - collateral_designation_2 - .read() - .unwrap() - .get_full_name() - .clone(), - collateral_designation_3 - .read() - .unwrap() - .get_full_name() - .clone(), + collateral_designation_1.get_full_name().clone(), + collateral_designation_2.get_full_name().clone(), + collateral_designation_3.get_full_name().clone(), ]) .expect("Failed to add route"); @@ -2317,21 +2291,12 @@ mod test { .get_single_observer_mut() .set_observer_from(collateral_sender); - collateral_bridge_1 - .write() - .unwrap() - .get_single_observer_mut() - .set_observer_from(collateral_router_sender.clone()); + collateral_bridge_1.set_observer_from(collateral_router_sender.clone()); - collateral_bridge_2 - .write() - .unwrap() - .get_single_observer_mut() - .set_observer_from(collateral_router_sender); + collateral_bridge_2.set_observer_from(collateral_router_sender); collateral_router .write() - .unwrap() .get_single_observer_mut() .set_observer_from(collateral_transfer_sender); @@ -2635,14 +2600,13 @@ mod test { }); let impl_collateral_bridge = - move |collateral_bridge: &Arc>, + move |collateral_bridge: &Arc, mock_bridge_sender: Sender, defer_2: Sender>| { let collateral_bridge_weak = Arc::downgrade(collateral_bridge); collateral_bridge - .write() - .unwrap() .implementor + .write() .set_observer_fn(move |event| { let collateral_bridge = collateral_bridge_weak.upgrade().unwrap(); match &event { @@ -2673,19 +2637,16 @@ mod test { let timestamp = Utc::now(); defer_2 .send(Box::new(move || { - collateral_bridge - .write() - .unwrap() - .notify_collateral_router_event( - chain_id, - address, - client_order_id, - timestamp, - route_from, - route_to, - amount - fee, - cumulative_fee, - ); + collateral_bridge.notify_collateral_router_event( + chain_id, + address, + client_order_id, + timestamp, + route_from, + route_to, + amount - fee, + cumulative_fee, + ); })) .expect("Failed to send"); } @@ -2741,7 +2702,6 @@ mod test { .expect("Failed to handle bridge event"), recv(collateral_router_receiver) -> res => collateral_router.write() - .unwrap() .handle_collateral_router_event(res.unwrap()) .map_err(|e| eyre!("{:?}", e)) .expect("Failed to handle router event"),