Skip to content

Commit

Permalink
Revert "fix running_swap memory leak"
Browse files Browse the repository at this point in the history
This reverts commit 245ea93.
  • Loading branch information
mariocynicys committed Dec 23, 2024
1 parent 70afcde commit 0ce3c93
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 59 deletions.
58 changes: 32 additions & 26 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::convert::TryFrom;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -531,11 +531,8 @@ struct LockedAmountInfo {
locked_amount: LockedAmount,
}

/// A running swap is the swap accompanied by the abort handle of the thread the swap is running on.
type RunningSwap = (Arc<dyn AtomicSwap>, AbortOnDropHandle);

struct SwapsContext {
running_swaps: Mutex<HashMap<Uuid, RunningSwap>>,
running_swaps: Mutex<Vec<(Weak<dyn AtomicSwap>, AbortOnDropHandle)>>,
active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand All @@ -551,7 +548,7 @@ impl SwapsContext {
fn from_ctx(ctx: &MmArc) -> Result<Arc<SwapsContext>, String> {
Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || {
Ok(SwapsContext {
running_swaps: Mutex::new(HashMap::new()),
running_swaps: Mutex::new(vec![]),
active_swaps_v2_infos: Mutex::new(HashMap::new()),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -636,9 +633,11 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

let mut locked = swap_lock.values().flat_map(|(swap, _)| swap.locked_amount()).fold(
MmNumber::from(0),
|mut total_amount, locked| {
let mut locked = swap_lock
.iter()
.filter_map(|(swap, _)| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
Expand All @@ -648,8 +647,7 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
}
}
total_amount
},
);
});
drop(swap_lock);

let locked_amounts = swap_ctx.locked_amounts.lock().unwrap();
Expand All @@ -673,8 +671,11 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
/// Get number of currently running swaps
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let count = swap_ctx.running_swaps.lock().unwrap().len();
count as u64
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, (swap, _)| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
}

/// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid
Expand All @@ -683,9 +684,10 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

swap_lock
.values()
.filter(|(swap, _)| swap.uuid() != except_uuid)
.flat_map(|(swap, _)| swap.locked_amount())
.iter()
.filter_map(|(swap, _)| swap.upgrade())
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
Expand All @@ -703,9 +705,11 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for (swap, _) in swaps.values() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
}
}
drop(swaps);
Expand All @@ -721,13 +725,15 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<

pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let mut uuids: Vec<_> = swap_ctx
.running_swaps
.lock()
.unwrap()
.keys()
.map(|uuid| (*uuid, LEGACY_SWAP_TYPE))
.collect();
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for (swap, _) in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
}

drop(swaps);

let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap();
uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type)));
Expand Down
16 changes: 5 additions & 11 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2090,10 +2090,10 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
};
}
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
let mut swap_fut = Box::pin({
let running_swap = running_swap.clone();
let mut swap_fut = Box::pin(
async move {
let mut events;
loop {
Expand Down Expand Up @@ -2150,8 +2150,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
}
}
}
.fuse()
});
.fuse(),
);
// Run the swap in an abortable task and wait for it to finish.
let (swap_ended_notifier, swap_ended_notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
Expand All @@ -2163,15 +2163,9 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
error!("Swap listener stopped listening!");
}
});
let uuid = running_swap.uuid;
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, (running_swap, abortable_swap));
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
swap_ended_notification.await.error_log_with_msg("Swap interrupted!");
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

pub struct MakerSwapPreparedParams {
Expand Down
17 changes: 11 additions & 6 deletions mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,11 @@ pub(crate) struct StopSwapResponse {

pub(crate) async fn stop_swap_rpc(ctx: MmArc, req: StopSwapRequest) -> MmResult<StopSwapResponse, StopSwapErr> {
let swap_ctx = SwapsContext::from_ctx(&ctx).map_err(StopSwapErr::Internal)?;
// By just removing the swap's abort handle from the running swaps map, the swap will terminate.
if swap_ctx.running_swaps.lock().unwrap().remove(&req.uuid).is_none() {
let mut running_swaps = swap_ctx.running_swaps.lock().unwrap();
let Some(position) = running_swaps.iter().position(|(swap, _)| swap.upgrade().map_or(true, |swap| swap.uuid() == &req.uuid)) else {
return MmError::err(StopSwapErr::NotRunning);
}
};
let (_swap, _abort_handle) = running_swaps.swap_remove(position);
Ok(StopSwapResponse {
result: "Success".to_string(),
})
Expand Down Expand Up @@ -581,8 +582,12 @@ pub(crate) async fn kickstart_swap_rpc(
// up with the same swap being kickstarted twice, but we have filesystem swap locks for that. This check is
// rather for convenience.
let swap_ctx = SwapsContext::from_ctx(&ctx).map_err(KickStartSwapErr::Internal)?;
if swap_ctx.running_swaps.lock().unwrap().contains_key(&req.uuid) {
return MmError::err(KickStartSwapErr::AlreadyRunning);
for (swap, _) in swap_ctx.running_swaps.lock().unwrap().iter() {
if let Some(swap) = swap.upgrade() {
if swap.uuid() == &req.uuid {
return MmError::err(KickStartSwapErr::AlreadyRunning);
}
}
}
// Load the swap from the DB.
let swap = match SavedSwap::load_my_swap_from_db(&ctx, req.uuid).await {
Expand Down Expand Up @@ -642,7 +647,7 @@ pub(crate) async fn kickstart_swap_rpc(
)));
},
};
// Kickstart the swap. A new abort handle will show up shortly for the swap.
// Kickstart the swap. A new aborthandle will show up shortly for the swap.
match swap {
SavedSwap::Maker(saved_swap) => ctx.spawner().spawn(run_maker_swap(
RunMakerSwapInput::KickStart {
Expand Down
23 changes: 7 additions & 16 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let uuid = swap.uuid.to_string();
let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy());
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
let mut swap_fut = Box::pin({
let running_swap = running_swap.clone();
let mut swap_fut = Box::pin(
async move {
let mut events;
loop {
Expand Down Expand Up @@ -516,8 +516,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}
}
}
.fuse()
});
.fuse(),
);
// Run the swap in an abortable task and wait for it to finish.
let (swap_ended_notifier, swap_ended_notification) = oneshot::channel();
let abortable_swap = spawn_abortable(async move {
Expand All @@ -529,15 +529,9 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
error!("Swap listener stopped listening!");
}
});
let uuid = running_swap.uuid;
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, (running_swap, abortable_swap));
swap_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));
// Halt this function until the swap has finished (or interrupted, i.e. aborted/panic).
swap_ended_notification.await.error_log_with_msg("Swap interrupted!");
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -3222,13 +3216,10 @@ mod taker_swap_tests {
.unwrap();
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
// Create a dummy abort handle as if it was a running swap.
let abortable_swap = spawn_abortable(async move {});
swaps_ctx
.running_swaps
.lock()
.unwrap()
.insert(arc.uuid, (arc, abortable_swap));
swaps_ctx.running_swaps.lock().unwrap().push((weak_ref, abortable_swap));

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down

0 comments on commit 0ce3c93

Please sign in to comment.