Skip to content

Commit

Permalink
call swap_kickstart_handler for TakerSwapStateMachine and for MakerSw…
Browse files Browse the repository at this point in the history
…apStateMachine
  • Loading branch information
laruh committed Dec 22, 2024
1 parent f8ba975 commit d08fa86
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 51 deletions.
176 changes: 157 additions & 19 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@

use super::lp_network::P2PRequestResult;
use crate::lp_network::{broadcast_p2p_msg, Libp2pPeerId, P2PProcessError, P2PProcessResult, P2PRequestError};
use crate::lp_swap::maker_swap_v2::{MakerSwapStateMachine, MakerSwapStorage};
use crate::lp_swap::taker_swap_v2::{TakerSwapStateMachine, TakerSwapStorage};
use crate::lp_swap::maker_swap_v2::{MakerSwapDbRepr, MakerSwapStateMachine, MakerSwapStorage};
use crate::lp_swap::taker_swap_v2::{TakerSwapDbRepr, TakerSwapStateMachine, TakerSwapStorage};
use bitcrypto::sha256;
use coins::{lp_coinfind, lp_coinfind_or_err, CoinFindError, DexFee, MmCoin, MmCoinEnum, TradeFee, TransactionEnum};
use common::log::{debug, warn};
Expand Down Expand Up @@ -115,7 +115,9 @@ mod trade_preimage;

#[cfg(target_arch = "wasm32")] mod swap_wasm_db;

use crate::lp_swap::swap_v2_common::{swap_kickstart_handler, GetSwapCoins};
pub use check_balance::{check_other_coin_balance_for_swap, CheckBalanceError, CheckBalanceResult};
use coins::eth::EthCoin;
use coins::utxo::utxo_standard::UtxoStandardCoin;
use crypto::secret_hash_algo::SecretHashAlgo;
use crypto::CryptoCtx;
Expand All @@ -130,7 +132,7 @@ use pubkey_banning::BanReason;
pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_rpc, unban_pubkeys_rpc};
pub use recreate_swap_data::recreate_swap_data;
pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult};
use swap_v2_common::{get_unfinished_swaps_uuids, swap_kickstart_handler, ActiveSwapV2Info};
use swap_v2_common::{get_unfinished_swaps_uuids, swap_kickstart_coins, ActiveSwapV2Info};
use swap_v2_pb::*;
use swap_v2_rpcs::{get_maker_swap_data_for_rpc, get_swap_type, get_taker_swap_data_for_rpc};
pub use swap_watcher::{process_watcher_msg, watcher_topic, TakerSwapWatcherData, MAKER_PAYMENT_SPEND_FOUND_LOG,
Expand Down Expand Up @@ -1358,9 +1360,20 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result<HashSet<String>, String> {
try_s!(migrate_swaps_data(&ctx).await);

let mut coins = HashSet::new();

kickstart_legacy_swaps(&ctx, &mut coins).await?;

kickstart_maker_swaps(&ctx, &mut coins).await?;

kickstart_taker_swaps(&ctx, &mut coins).await?;

Ok(coins)
}

async fn kickstart_legacy_swaps(ctx: &MmArc, coins: &mut HashSet<String>) -> Result<(), String> {
let legacy_unfinished_uuids = try_s!(get_unfinished_swaps_uuids(ctx.clone(), LEGACY_SWAP_TYPE).await);
for uuid in legacy_unfinished_uuids {
let swap = match SavedSwap::load_my_swap_from_db(&ctx, uuid).await {
let swap = match SavedSwap::load_my_swap_from_db(ctx, uuid).await {
Ok(Some(s)) => s,
Ok(None) => {
warn!("Swap {} is indexed, but doesn't exist in DB", uuid);
Expand All @@ -1372,29 +1385,36 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result<HashSet<String>, String> {
},
};
info!("Kick starting the swap {}", swap.uuid());

let maker_coin_ticker = match swap.maker_coin_ticker() {
Ok(t) => t,
Err(e) => {
error!("Error {} getting maker coin of swap: {}", e, swap.uuid());
continue;
},
};

let taker_coin_ticker = match swap.taker_coin_ticker() {
Ok(t) => t,
Err(e) => {
error!("Error {} getting taker coin of swap {}", e, swap.uuid());
continue;
},
};

coins.insert(maker_coin_ticker.clone());
coins.insert(taker_coin_ticker.clone());

let fut = kickstart_thread_handler(ctx.clone(), swap, maker_coin_ticker, taker_coin_ticker);
ctx.spawner().spawn(fut);
}
Ok(())
}

async fn kickstart_maker_swaps(ctx: &MmArc, coins: &mut HashSet<String>) -> Result<(), String> {
let maker_swap_storage = MakerSwapStorage::new(ctx.clone());
let unfinished_maker_uuids = try_s!(maker_swap_storage.get_unfinished().await);

for maker_uuid in unfinished_maker_uuids {
info!("Trying to kickstart maker swap {}", maker_uuid);
let maker_swap_repr = match maker_swap_storage.get_repr(maker_uuid).await {
Expand All @@ -1409,17 +1429,25 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result<HashSet<String>, String> {
coins.insert(maker_swap_repr.maker_coin.clone());
coins.insert(maker_swap_repr.taker_coin.clone());

let fut = swap_kickstart_handler::<MakerSwapStateMachine<UtxoStandardCoin, UtxoStandardCoin>>(
ctx.clone(),
maker_swap_repr,
maker_swap_storage.clone(),
maker_uuid,
);
ctx.spawner().spawn(fut);
if let Some((maker_coin, taker_coin)) = swap_kickstart_coins(ctx, &maker_swap_repr, &maker_uuid).await {
spawn_swap_kickstart_handler_for_maker(
ctx,
maker_swap_repr,
maker_swap_storage.clone(),
maker_uuid,
maker_coin,
taker_coin,
)
.await;
}
}
Ok(())
}

async fn kickstart_taker_swaps(ctx: &MmArc, coins: &mut HashSet<String>) -> Result<(), String> {
let taker_swap_storage = TakerSwapStorage::new(ctx.clone());
let unfinished_taker_uuids = try_s!(taker_swap_storage.get_unfinished().await);

for taker_uuid in unfinished_taker_uuids {
info!("Trying to kickstart taker swap {}", taker_uuid);
let taker_swap_repr = match taker_swap_storage.get_repr(taker_uuid).await {
Expand All @@ -1434,15 +1462,125 @@ pub async fn swap_kick_starts(ctx: MmArc) -> Result<HashSet<String>, String> {
coins.insert(taker_swap_repr.maker_coin.clone());
coins.insert(taker_swap_repr.taker_coin.clone());

let fut = swap_kickstart_handler::<TakerSwapStateMachine<UtxoStandardCoin, UtxoStandardCoin>>(
ctx.clone(),
taker_swap_repr,
taker_swap_storage.clone(),
taker_uuid,
);
ctx.spawner().spawn(fut);
if let Some((maker_coin, taker_coin)) = swap_kickstart_coins(ctx, &taker_swap_repr, &taker_uuid).await {
spawn_swap_kickstart_handler_for_taker(
ctx,
taker_swap_repr,
taker_swap_storage.clone(),
taker_uuid,
maker_coin,
taker_coin,
)
.await;
}
}
Ok(coins)
Ok(())
}

async fn spawn_swap_kickstart_handler_for_maker(
ctx: &MmArc,
maker_swap_repr: MakerSwapDbRepr,
maker_swap_storage: MakerSwapStorage,
maker_uuid: Uuid,
maker_coin: MmCoinEnum,
taker_coin: MmCoinEnum,
) {
match (maker_coin, taker_coin) {
(MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => {
let fut = swap_kickstart_handler::<
MakerSwapStateMachine<UtxoStandardCoin, UtxoStandardCoin>,
UtxoStandardCoin,
UtxoStandardCoin,
>(maker_swap_repr, maker_swap_storage, maker_uuid, m, t);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::EthCoin(m), MmCoinEnum::EthCoin(t)) => {
let fut = swap_kickstart_handler::<MakerSwapStateMachine<EthCoin, EthCoin>, EthCoin, EthCoin>(
maker_swap_repr,
maker_swap_storage,
maker_uuid,
m,
t,
);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::UtxoCoin(m), MmCoinEnum::EthCoin(t)) => {
let fut = swap_kickstart_handler::<
MakerSwapStateMachine<UtxoStandardCoin, EthCoin>,
UtxoStandardCoin,
EthCoin,
>(maker_swap_repr, maker_swap_storage, maker_uuid, m, t);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::EthCoin(m), MmCoinEnum::UtxoCoin(t)) => {
let fut = swap_kickstart_handler::<
MakerSwapStateMachine<EthCoin, UtxoStandardCoin>,
EthCoin,
UtxoStandardCoin,
>(maker_swap_repr, maker_swap_storage, maker_uuid, m, t);
ctx.spawner().spawn(fut);
},
_ => {
error!(
"V2 swaps are not currently supported for {}/{} pair",
maker_swap_repr.maker_coin(),
maker_swap_repr.taker_coin()
);
},
};
}

async fn spawn_swap_kickstart_handler_for_taker(
ctx: &MmArc,
taker_swap_repr: TakerSwapDbRepr,
taker_swap_storage: TakerSwapStorage,
taker_uuid: Uuid,
maker_coin: MmCoinEnum,
taker_coin: MmCoinEnum,
) {
match (maker_coin, taker_coin) {
(MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => {
let fut = swap_kickstart_handler::<
TakerSwapStateMachine<UtxoStandardCoin, UtxoStandardCoin>,
UtxoStandardCoin,
UtxoStandardCoin,
>(taker_swap_repr, taker_swap_storage, taker_uuid, m, t);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::EthCoin(m), MmCoinEnum::EthCoin(t)) => {
let fut = swap_kickstart_handler::<TakerSwapStateMachine<EthCoin, EthCoin>, EthCoin, EthCoin>(
taker_swap_repr,
taker_swap_storage,
taker_uuid,
m,
t,
);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::UtxoCoin(m), MmCoinEnum::EthCoin(t)) => {
let fut = swap_kickstart_handler::<
TakerSwapStateMachine<UtxoStandardCoin, EthCoin>,
UtxoStandardCoin,
EthCoin,
>(taker_swap_repr, taker_swap_storage, taker_uuid, m, t);
ctx.spawner().spawn(fut);
},
(MmCoinEnum::EthCoin(m), MmCoinEnum::UtxoCoin(t)) => {
let fut = swap_kickstart_handler::<
TakerSwapStateMachine<EthCoin, UtxoStandardCoin>,
EthCoin,
UtxoStandardCoin,
>(taker_swap_repr, taker_swap_storage, taker_uuid, m, t);
ctx.spawner().spawn(fut);
},
_ => {
error!(
"V2 swaps are not currently supported for {}/{} pair",
taker_swap_repr.maker_coin(),
taker_swap_repr.taker_coin()
);
},
};
}

async fn kickstart_thread_handler(ctx: MmArc, swap: SavedSwap, maker_coin_ticker: String, taker_coin_ticker: String) {
Expand Down
62 changes: 30 additions & 32 deletions mm2src/mm2_main/src/lp_swap/swap_v2_common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::lp_network::{subscribe_to_topic, unsubscribe_from_topic};
use crate::lp_swap::swap_lock::{SwapLock, SwapLockError, SwapLockOps};
use crate::lp_swap::{swap_v2_topic, SwapsContext};
use coins::utxo::utxo_standard::UtxoStandardCoin;
use coins::{lp_coinfind, MmCoinEnum};
use coins::{lp_coinfind, MakerCoinSwapOpsV2, MmCoin, MmCoinEnum, TakerCoinSwapOpsV2};
use common::executor::abortable_queue::AbortableQueue;
use common::executor::{SpawnFuture, Timer};
use common::log::{error, info, warn};
Expand Down Expand Up @@ -292,25 +291,17 @@ pub(super) trait GetSwapCoins {
fn taker_coin(&self) -> &str;
}

/// Generic function for upgraded swaps kickstart handling.
/// It is implemented only for UtxoStandardCoin/UtxoStandardCoin case temporary.
pub(super) async fn swap_kickstart_handler<
T: StorableStateMachine<RecreateCtx = SwapRecreateCtx<UtxoStandardCoin, UtxoStandardCoin>>,
>(
ctx: MmArc,
swap_repr: <T::Storage as StateMachineStorage>::DbRepr,
storage: T::Storage,
uuid: <T::Storage as StateMachineStorage>::MachineId,
) where
<T::Storage as StateMachineStorage>::MachineId: Copy + std::fmt::Display,
<T::Storage as StateMachineStorage>::DbRepr: GetSwapCoins,
T::Error: std::fmt::Display,
T::RecreateError: std::fmt::Display,
{
/// Attempts to find and return the maker and taker coins required for the swap to proceed.
/// If a coin is not activated, it logs the information and retries until the coin is found or an error occurs.
pub(super) async fn swap_kickstart_coins<T: GetSwapCoins>(
ctx: &MmArc,
swap_repr: &T,
uuid: &Uuid,
) -> Option<(MmCoinEnum, MmCoinEnum)> {
let taker_coin_ticker = swap_repr.taker_coin();

let taker_coin = loop {
match lp_coinfind(&ctx, taker_coin_ticker).await {
match lp_coinfind(ctx, taker_coin_ticker).await {
Ok(Some(c)) => break c,
Ok(None) => {
info!(
Expand All @@ -321,15 +312,15 @@ pub(super) async fn swap_kickstart_handler<
},
Err(e) => {
error!("Error {} on {} find attempt", e, taker_coin_ticker);
return;
return None;
},
};
};

let maker_coin_ticker = swap_repr.maker_coin();

let maker_coin = loop {
match lp_coinfind(&ctx, maker_coin_ticker).await {
match lp_coinfind(ctx, maker_coin_ticker).await {
Ok(Some(c)) => break c,
Ok(None) => {
info!(
Expand All @@ -340,23 +331,30 @@ pub(super) async fn swap_kickstart_handler<
},
Err(e) => {
error!("Error {} on {} find attempt", e, maker_coin_ticker);
return;
return None;
},
};
};

// TODO add ETH support
let (maker_coin, taker_coin) = match (maker_coin, taker_coin) {
(MmCoinEnum::UtxoCoin(m), MmCoinEnum::UtxoCoin(t)) => (m, t),
_ => {
error!(
"V2 swaps are not currently supported for {}/{} pair",
maker_coin_ticker, taker_coin_ticker
);
return;
},
};
Some((maker_coin, taker_coin))
}

/// Handles the recreation and kickstart of a swap state machine.
pub(super) async fn swap_kickstart_handler<
T: StorableStateMachine<RecreateCtx = SwapRecreateCtx<MakerCoin, TakerCoin>>,
MakerCoin: MmCoin + MakerCoinSwapOpsV2,
TakerCoin: MmCoin + TakerCoinSwapOpsV2,
>(
swap_repr: <T::Storage as StateMachineStorage>::DbRepr,
storage: T::Storage,
uuid: <T::Storage as StateMachineStorage>::MachineId,
maker_coin: MakerCoin,
taker_coin: TakerCoin,
) where
<T::Storage as StateMachineStorage>::MachineId: Copy + std::fmt::Display,
T::Error: std::fmt::Display,
T::RecreateError: std::fmt::Display,
{
let recreate_context = SwapRecreateCtx { maker_coin, taker_coin };

let (mut state_machine, state) = match T::recreate_machine(uuid, storage, swap_repr, recreate_context).await {
Expand Down

0 comments on commit d08fa86

Please sign in to comment.