Skip to content

Commit

Permalink
Merge pull request #839 from Lederstrumpf/swap_state_refactors
Browse files Browse the repository at this point in the history
Swapd: state refactors
  • Loading branch information
h4sh3d authored Dec 15, 2022
2 parents 2a866aa + 1d1e45b commit 806592c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 81 deletions.
156 changes: 77 additions & 79 deletions src/swapd/swap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use farcaster_core::{
blockchain::Blockchain,
role::{SwapRole, TradeRole},
swap::btcxmr::message::BuyProcedureSignature,
transaction::Fundable,
transaction::TxLabel,
};
use microservices::esb::Handler;
Expand All @@ -32,7 +33,7 @@ use crate::{
swapd::{
runtime::aggregate_xmr_spend_view,
syncer_client::{log_tx_created, log_tx_seen},
wallet::{HandleBuyProcedureSignatureRes, HandleRefundProcedureSignaturesRes},
wallet::{BobState, HandleBuyProcedureSignatureRes, HandleRefundProcedureSignaturesRes},
},
syncerd::{
Abort, Boolean, SweepSuccess, Task, TaskTarget, TransactionConfirmations,
Expand Down Expand Up @@ -540,7 +541,7 @@ impl StateMachine<Runtime, Error> for SwapStateMachine {
}
SwapStateMachine::AlicePunish => try_alice_punish_to_swap_end(event, runtime),

_ => Ok(Some(self)),
SwapStateMachine::SwapEnd(_) => Ok(None),
}
}

Expand Down Expand Up @@ -694,7 +695,6 @@ fn attempt_transition_to_init_maker(
commit.clone(),
)?;
let local_params = wallet.local_params();
let funding_address = wallet.funding_address();
runtime.peer_service = peerd;
if runtime.peer_service != ServiceId::Loopback {
runtime.connected = true;
Expand All @@ -715,20 +715,33 @@ fn attempt_transition_to_init_maker(
// send maker commit message to counter-party
runtime.log_trace(format!("sending peer MakerCommit msg {}", &local_commit));
runtime.send_peer(event.endpoints, PeerMsg::MakerCommit(local_commit.clone()))?;
match swap_role {
SwapRole::Bob => Ok(Some(SwapStateMachine::BobInitMaker(BobInitMaker {
local_commit,
local_params,
funding_address: funding_address.unwrap(),
remote_commit: commit,
wallet,
reveal: None,
}))),
SwapRole::Alice => Ok(Some(SwapStateMachine::AliceInitMaker(AliceInitMaker {
local_params,
remote_commit: commit,
wallet,
}))),
match (swap_role, wallet.clone()) {
(SwapRole::Bob, Wallet::Bob(BobState { funding_tx, .. })) => {
Ok(Some(SwapStateMachine::BobInitMaker(BobInitMaker {
local_commit,
local_params,
funding_address: funding_tx
.get_address()
.expect("Funding address should be valid"),
remote_commit: commit,
wallet,
reveal: None,
})))
}
(SwapRole::Alice, Wallet::Alice(_)) => {
Ok(Some(SwapStateMachine::AliceInitMaker(AliceInitMaker {
local_params,
remote_commit: commit,
wallet,
})))
}
_ => {
runtime.log_error(format!(
"Invalid swap role {} for wallet {}",
swap_role, wallet
));
Ok(None)
}
}
}
BusMsg::Ctl(CtlMsg::AbortSwap) => handle_abort_swap(event, runtime),
Expand Down Expand Up @@ -974,14 +987,12 @@ fn try_bob_reveal_to_bob_funded(
.zip([TxLabel::Lock, TxLabel::Cancel, TxLabel::Refund])
{
runtime.log_debug(format!("register watch {} tx", tx_label.label()));
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}

// Set the monero address creation height for Bob before setting the first checkpoint
Expand Down Expand Up @@ -1066,16 +1077,11 @@ fn try_bob_funded_to_bob_refund_procedure_signature(
// register a watch task for buy tx.
// registration performed now already to ensure it's present in checkpoint.
runtime.log_debug("register watch buy tx task");
if !runtime.syncer_state.is_watched_tx(&TxLabel::Buy) {
let buy_tx = buy_procedure_signature.buy.clone().extract_tx();
let task = runtime
.syncer_state
.watch_tx_btc(buy_tx.txid(), TxLabel::Buy);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let buy_tx = buy_procedure_signature.buy.clone().extract_tx();
let task = runtime
.syncer_state
.watch_tx_btc(buy_tx.txid(), TxLabel::Buy);
event.send_sync_service(runtime.syncer_state.bitcoin_syncer(), SyncMsg::Task(task))?;
// Checkpoint BobRefundProcedureSignatures
let new_ssm =
SwapStateMachine::BobRefundProcedureSignatures(BobRefundProcedureSignatures {
Expand Down Expand Up @@ -1132,13 +1138,11 @@ fn try_bob_refund_procedure_signatures_to_bob_accordant_lock(
}
if let Some(tx_label) = runtime.syncer_state.tasks.watched_addrs.remove(&id) {
let abort_task = runtime.syncer_state.abort_task(id.clone());
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let watch_tx = runtime.syncer_state.watch_tx_xmr(hash.clone(), tx_label);
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(watch_tx),
)?;
}
let watch_tx = runtime.syncer_state.watch_tx_xmr(hash.clone(), tx_label);
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(watch_tx),
)?;
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(abort_task),
Expand Down Expand Up @@ -1457,14 +1461,12 @@ fn try_alice_reveal_to_alice_core_arbitrating_setup(
TxLabel::Refund,
]) {
runtime.log_debug(format!("Register watch {} tx", tx_label));
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
// handle the core arbitrating setup message with the wallet
runtime.log_debug("Handling core arb setup with wallet");
Expand Down Expand Up @@ -1653,18 +1655,15 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
id, hash, amount, block, tx
));
let txlabel = TxLabel::AccLock;
if !runtime.syncer_state.is_watched_tx(&txlabel) {
let task = runtime.syncer_state.watch_tx_xmr(hash.clone(), txlabel);
if runtime.syncer_state.awaiting_funding {
event.send_ctl_service(
ServiceId::Farcasterd,
CtlMsg::FundingCompleted(Blockchain::Monero),
)?;
runtime.syncer_state.awaiting_funding = false;
}
event
.send_sync_service(runtime.syncer_state.monero_syncer(), SyncMsg::Task(task))?;
let task = runtime.syncer_state.watch_tx_xmr(hash.clone(), txlabel);
if runtime.syncer_state.awaiting_funding {
event.send_ctl_service(
ServiceId::Farcasterd,
CtlMsg::FundingCompleted(Blockchain::Monero),
)?;
runtime.syncer_state.awaiting_funding = false;
}
event.send_sync_service(runtime.syncer_state.monero_syncer(), SyncMsg::Task(task))?;
if runtime
.syncer_state
.tasks
Expand Down Expand Up @@ -1731,14 +1730,9 @@ fn try_alice_accordant_lock_to_alice_buy_procedure_signature(
BusMsg::P2p(PeerMsg::BuyProcedureSignature(buy_procedure_signature)) => {
// register a watch task for buy
runtime.log_debug("Registering watch buy tx task");
if !runtime.syncer_state.is_watched_tx(&TxLabel::Buy) {
let txid = buy_procedure_signature.buy.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, TxLabel::Buy);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = buy_procedure_signature.buy.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, TxLabel::Buy);
event.send_sync_service(runtime.syncer_state.bitcoin_syncer(), SyncMsg::Task(task))?;
// Handle the received buy procedure signature message with the wallet
runtime.log_debug("Handling buy procedure signature with wallet");
let HandleBuyProcedureSignatureRes { cancel_tx, buy_tx } = wallet
Expand Down Expand Up @@ -1873,14 +1867,12 @@ fn try_alice_canceled_to_alice_refund_or_alice_punish(
runtime.log_debug("Publishing punish tx");
let (tx_label, punish_tx) = runtime.txs.remove_entry(&TxLabel::Punish).unwrap();
// syncer's watch punish tx task
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = punish_tx.txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = punish_tx.txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
runtime.broadcast(punish_tx, tx_label, event.endpoints)?;
Ok(Some(SwapStateMachine::AlicePunish))
}
Expand Down Expand Up @@ -2089,8 +2081,14 @@ fn try_alice_punish_to_swap_end(
) -> Result<Option<SwapStateMachine>, Error> {
match event.request {
BusMsg::Sync(SyncMsg::Event(SyncEvent::TransactionConfirmations(
TransactionConfirmations { id, .. },
))) if runtime.syncer_state.tasks.watched_txs.get(&id) == Some(&TxLabel::Punish) => {
TransactionConfirmations {
id,
confirmations: Some(confirmations),
..
},
))) if runtime.syncer_state.tasks.watched_txs.get(&id) == Some(&TxLabel::Punish)
&& confirmations >= runtime.temporal_safety.btc_finality_thr =>
{
let abort_all = Task::Abort(Abort {
task_target: TaskTarget::AllTasks,
respond: Boolean::False,
Expand Down
14 changes: 14 additions & 0 deletions src/swapd/syncer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ impl SyncerState {
}

pub fn watch_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task {
if self.is_watched_tx(&tx_label) {
warn!(
"{} | Already watching for tx with label {} - notifications will be repeated",
self.swap_id.swap_id(),
tx_label.label()
);
}
let id = self.tasks.new_taskid();
self.tasks.watched_txs.insert(id, tx_label);
self.tasks.txids.insert(tx_label, txid);
Expand All @@ -136,6 +143,13 @@ impl SyncerState {
self.tasks.watched_txs.values().any(|tx| tx == tx_label)
}
pub fn watch_tx_xmr(&mut self, hash: Vec<u8>, tx_label: TxLabel) -> Task {
if self.is_watched_tx(&tx_label) {
warn!(
"{} | Already watching for tx with label {} - notifications will be repeated",
self.swap_id.swap_id(),
tx_label.label()
);
}
let id = self.tasks.new_taskid();
self.tasks.watched_txs.insert(id, tx_label);
info!(
Expand Down
7 changes: 5 additions & 2 deletions src/swapd/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ pub struct HandleBuyProcedureSignatureRes {
pub buy_tx: bitcoin::Transaction,
}

#[derive(Clone, Debug, StrictEncode, StrictDecode)]
#[derive(Clone, Display, Debug, StrictEncode, StrictDecode)]
pub enum Wallet {
#[display("Alice's wallet")]
Alice(AliceState),
#[display("Bob's wallet")]
Bob(BobState),
}

Expand Down Expand Up @@ -758,8 +760,9 @@ impl Wallet {
}
} else {
error!(
"{} | Wallet not found or not on correct state",
"{} | Not correct wallet, should be Bob's wallet: {}",
swap_id.swap_id(),
self
);
return Err(Error::Farcaster(
"Needs to be a Bob wallet to process Alice Commit".to_string(),
Expand Down

0 comments on commit 806592c

Please sign in to comment.