diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 58fcbe58eac..348f68882e7 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -59,8 +59,8 @@ use super::lp_network::P2PRequestResult; use crate::lp_network::{broadcast_p2p_msg, Libp2pPeerId, P2PProcessError, P2PProcessResult, P2PRequestError}; -use crate::lp_swap::maker_swap_v2::{MakerSwapStateMachine, MakerSwapStorage}; -use crate::lp_swap::taker_swap_v2::{TakerSwapStateMachine, TakerSwapStorage}; +use crate::lp_swap::maker_swap_v2::{MakerSwapDbRepr, MakerSwapStateMachine, MakerSwapStorage}; +use crate::lp_swap::taker_swap_v2::{TakerSwapDbRepr, TakerSwapStateMachine, TakerSwapStorage}; use bitcrypto::sha256; use coins::{lp_coinfind, lp_coinfind_or_err, CoinFindError, DexFee, MmCoin, MmCoinEnum, TradeFee, TransactionEnum}; use common::log::{debug, warn}; @@ -115,7 +115,9 @@ mod trade_preimage; #[cfg(target_arch = "wasm32")] mod swap_wasm_db; +use crate::lp_swap::swap_v2_common::{swap_kickstart_handler, GetSwapCoins}; pub use check_balance::{check_other_coin_balance_for_swap, CheckBalanceError, CheckBalanceResult}; +use coins::eth::EthCoin; use coins::utxo::utxo_standard::UtxoStandardCoin; use crypto::secret_hash_algo::SecretHashAlgo; use crypto::CryptoCtx; @@ -130,7 +132,7 @@ use pubkey_banning::BanReason; pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_rpc, unban_pubkeys_rpc}; pub use recreate_swap_data::recreate_swap_data; pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult}; -use swap_v2_common::{get_unfinished_swaps_uuids, swap_kickstart_handler, ActiveSwapV2Info}; +use swap_v2_common::{get_unfinished_swaps_uuids, swap_kickstart_coins, ActiveSwapV2Info}; use swap_v2_pb::*; use swap_v2_rpcs::{get_maker_swap_data_for_rpc, get_swap_type, get_taker_swap_data_for_rpc}; pub use swap_watcher::{process_watcher_msg, watcher_topic, TakerSwapWatcherData, MAKER_PAYMENT_SPEND_FOUND_LOG, @@ -1358,9 +1360,20 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { try_s!(migrate_swaps_data(&ctx).await); let mut coins = HashSet::new(); + + kickstart_legacy_swaps(&ctx, &mut coins).await?; + + kickstart_maker_swaps(&ctx, &mut coins).await?; + + kickstart_taker_swaps(&ctx, &mut coins).await?; + + Ok(coins) +} + +async fn kickstart_legacy_swaps(ctx: &MmArc, coins: &mut HashSet) -> Result<(), String> { let legacy_unfinished_uuids = try_s!(get_unfinished_swaps_uuids(ctx.clone(), LEGACY_SWAP_TYPE).await); for uuid in legacy_unfinished_uuids { - let swap = match SavedSwap::load_my_swap_from_db(&ctx, uuid).await { + let swap = match SavedSwap::load_my_swap_from_db(ctx, uuid).await { Ok(Some(s)) => s, Ok(None) => { warn!("Swap {} is indexed, but doesn't exist in DB", uuid); @@ -1372,6 +1385,7 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { }, }; info!("Kick starting the swap {}", swap.uuid()); + let maker_coin_ticker = match swap.maker_coin_ticker() { Ok(t) => t, Err(e) => { @@ -1379,6 +1393,7 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { continue; }, }; + let taker_coin_ticker = match swap.taker_coin_ticker() { Ok(t) => t, Err(e) => { @@ -1386,15 +1401,20 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { continue; }, }; + coins.insert(maker_coin_ticker.clone()); coins.insert(taker_coin_ticker.clone()); let fut = kickstart_thread_handler(ctx.clone(), swap, maker_coin_ticker, taker_coin_ticker); ctx.spawner().spawn(fut); } + Ok(()) +} +async fn kickstart_maker_swaps(ctx: &MmArc, coins: &mut HashSet) -> Result<(), String> { let maker_swap_storage = MakerSwapStorage::new(ctx.clone()); let unfinished_maker_uuids = try_s!(maker_swap_storage.get_unfinished().await); + for maker_uuid in unfinished_maker_uuids { info!("Trying to kickstart maker swap {}", maker_uuid); let maker_swap_repr = match maker_swap_storage.get_repr(maker_uuid).await { @@ -1409,17 +1429,25 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { coins.insert(maker_swap_repr.maker_coin.clone()); coins.insert(maker_swap_repr.taker_coin.clone()); - let fut = swap_kickstart_handler::>( - ctx.clone(), - maker_swap_repr, - maker_swap_storage.clone(), - maker_uuid, - ); - ctx.spawner().spawn(fut); + if let Some((maker_coin, taker_coin)) = swap_kickstart_coins(ctx, &maker_swap_repr, &maker_uuid).await { + spawn_swap_kickstart_handler_for_maker( + ctx, + maker_swap_repr, + maker_swap_storage.clone(), + maker_uuid, + maker_coin, + taker_coin, + ) + .await; + } } + Ok(()) +} +async fn kickstart_taker_swaps(ctx: &MmArc, coins: &mut HashSet) -> Result<(), String> { let taker_swap_storage = TakerSwapStorage::new(ctx.clone()); let unfinished_taker_uuids = try_s!(taker_swap_storage.get_unfinished().await); + for taker_uuid in unfinished_taker_uuids { info!("Trying to kickstart taker swap {}", taker_uuid); let taker_swap_repr = match taker_swap_storage.get_repr(taker_uuid).await { @@ -1434,15 +1462,125 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result, String> { coins.insert(taker_swap_repr.maker_coin.clone()); coins.insert(taker_swap_repr.taker_coin.clone()); - let fut = swap_kickstart_handler::>( - ctx.clone(), - taker_swap_repr, - taker_swap_storage.clone(), - taker_uuid, - ); - ctx.spawner().spawn(fut); + if let Some((maker_coin, taker_coin)) = swap_kickstart_coins(ctx, &taker_swap_repr, &taker_uuid).await { + spawn_swap_kickstart_handler_for_taker( + ctx, + taker_swap_repr, + taker_swap_storage.clone(), + taker_uuid, + maker_coin, + taker_coin, + ) + .await; + } } - Ok(coins) + Ok(()) +} + +async fn spawn_swap_kickstart_handler_for_maker( + ctx: &MmArc, + maker_swap_repr: MakerSwapDbRepr, + maker_swap_storage: MakerSwapStorage, + maker_uuid: Uuid, + maker_coin: MmCoinEnum, + taker_coin: MmCoinEnum, +) { + match (maker_coin, taker_coin) { + (MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => { + let fut = swap_kickstart_handler::< + MakerSwapStateMachine, + UtxoStandardCoin, + UtxoStandardCoin, + >(maker_swap_repr, maker_swap_storage, maker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::EthCoin(m), MmCoinEnum::EthCoin(t)) => { + let fut = swap_kickstart_handler::, EthCoin, EthCoin>( + maker_swap_repr, + maker_swap_storage, + maker_uuid, + m, + t, + ); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::UtxoCoin(m), MmCoinEnum::EthCoin(t)) => { + let fut = swap_kickstart_handler::< + MakerSwapStateMachine, + UtxoStandardCoin, + EthCoin, + >(maker_swap_repr, maker_swap_storage, maker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::EthCoin(m), MmCoinEnum::UtxoCoin(t)) => { + let fut = swap_kickstart_handler::< + MakerSwapStateMachine, + EthCoin, + UtxoStandardCoin, + >(maker_swap_repr, maker_swap_storage, maker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + _ => { + error!( + "V2 swaps are not currently supported for {}/{} pair", + maker_swap_repr.maker_coin(), + maker_swap_repr.taker_coin() + ); + }, + }; +} + +async fn spawn_swap_kickstart_handler_for_taker( + ctx: &MmArc, + taker_swap_repr: TakerSwapDbRepr, + taker_swap_storage: TakerSwapStorage, + taker_uuid: Uuid, + maker_coin: MmCoinEnum, + taker_coin: MmCoinEnum, +) { + match (maker_coin, taker_coin) { + (MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => { + let fut = swap_kickstart_handler::< + TakerSwapStateMachine, + UtxoStandardCoin, + UtxoStandardCoin, + >(taker_swap_repr, taker_swap_storage, taker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::EthCoin(m), MmCoinEnum::EthCoin(t)) => { + let fut = swap_kickstart_handler::, EthCoin, EthCoin>( + taker_swap_repr, + taker_swap_storage, + taker_uuid, + m, + t, + ); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::UtxoCoin(m), MmCoinEnum::EthCoin(t)) => { + let fut = swap_kickstart_handler::< + TakerSwapStateMachine, + UtxoStandardCoin, + EthCoin, + >(taker_swap_repr, taker_swap_storage, taker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + (MmCoinEnum::EthCoin(m), MmCoinEnum::UtxoCoin(t)) => { + let fut = swap_kickstart_handler::< + TakerSwapStateMachine, + EthCoin, + UtxoStandardCoin, + >(taker_swap_repr, taker_swap_storage, taker_uuid, m, t); + ctx.spawner().spawn(fut); + }, + _ => { + error!( + "V2 swaps are not currently supported for {}/{} pair", + taker_swap_repr.maker_coin(), + taker_swap_repr.taker_coin() + ); + }, + }; } async fn kickstart_thread_handler(ctx: MmArc, swap: SavedSwap, maker_coin_ticker: String, taker_coin_ticker: String) { diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 52f1690335e..4b2fb78591f 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -1,8 +1,7 @@ use crate::lp_network::{subscribe_to_topic, unsubscribe_from_topic}; use crate::lp_swap::swap_lock::{SwapLock, SwapLockError, SwapLockOps}; use crate::lp_swap::{swap_v2_topic, SwapsContext}; -use coins::utxo::utxo_standard::UtxoStandardCoin; -use coins::{lp_coinfind, MmCoinEnum}; +use coins::{lp_coinfind, MakerCoinSwapOpsV2, MmCoin, MmCoinEnum, TakerCoinSwapOpsV2}; use common::executor::abortable_queue::AbortableQueue; use common::executor::{SpawnFuture, Timer}; use common::log::{error, info, warn}; @@ -292,25 +291,17 @@ pub(super) trait GetSwapCoins { fn taker_coin(&self) -> &str; } -/// Generic function for upgraded swaps kickstart handling. -/// It is implemented only for UtxoStandardCoin/UtxoStandardCoin case temporary. -pub(super) async fn swap_kickstart_handler< - T: StorableStateMachine>, ->( - ctx: MmArc, - swap_repr: ::DbRepr, - storage: T::Storage, - uuid: ::MachineId, -) where - ::MachineId: Copy + std::fmt::Display, - ::DbRepr: GetSwapCoins, - T::Error: std::fmt::Display, - T::RecreateError: std::fmt::Display, -{ +/// Attempts to find and return the maker and taker coins required for the swap to proceed. +/// If a coin is not activated, it logs the information and retries until the coin is found or an error occurs. +pub(super) async fn swap_kickstart_coins( + ctx: &MmArc, + swap_repr: &T, + uuid: &Uuid, +) -> Option<(MmCoinEnum, MmCoinEnum)> { let taker_coin_ticker = swap_repr.taker_coin(); let taker_coin = loop { - match lp_coinfind(&ctx, taker_coin_ticker).await { + match lp_coinfind(ctx, taker_coin_ticker).await { Ok(Some(c)) => break c, Ok(None) => { info!( @@ -321,7 +312,7 @@ pub(super) async fn swap_kickstart_handler< }, Err(e) => { error!("Error {} on {} find attempt", e, taker_coin_ticker); - return; + return None; }, }; }; @@ -329,7 +320,7 @@ pub(super) async fn swap_kickstart_handler< let maker_coin_ticker = swap_repr.maker_coin(); let maker_coin = loop { - match lp_coinfind(&ctx, maker_coin_ticker).await { + match lp_coinfind(ctx, maker_coin_ticker).await { Ok(Some(c)) => break c, Ok(None) => { info!( @@ -340,23 +331,30 @@ pub(super) async fn swap_kickstart_handler< }, Err(e) => { error!("Error {} on {} find attempt", e, maker_coin_ticker); - return; + return None; }, }; }; - // TODO add ETH support - let (maker_coin, taker_coin) = match (maker_coin, taker_coin) { - (MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => (m, t), - _ => { - error!( - "V2 swaps are not currently supported for {}/{} pair", - maker_coin_ticker, taker_coin_ticker - ); - return; - }, - }; + Some((maker_coin, taker_coin)) +} +/// Handles the recreation and kickstart of a swap state machine. +pub(super) async fn swap_kickstart_handler< + T: StorableStateMachine>, + MakerCoin: MmCoin + MakerCoinSwapOpsV2, + TakerCoin: MmCoin + TakerCoinSwapOpsV2, +>( + swap_repr: ::DbRepr, + storage: T::Storage, + uuid: ::MachineId, + maker_coin: MakerCoin, + taker_coin: TakerCoin, +) where + ::MachineId: Copy + std::fmt::Display, + T::Error: std::fmt::Display, + T::RecreateError: std::fmt::Display, +{ let recreate_context = SwapRecreateCtx { maker_coin, taker_coin }; let (mut state_machine, state) = match T::recreate_machine(uuid, storage, swap_repr, recreate_context).await {