Skip to content

Commit

Permalink
add recently cancelled orders time cache to orderbook
Browse files Browse the repository at this point in the history
  • Loading branch information
shamardy committed Sep 28, 2024
1 parent 6dc3c7d commit 6ad60e9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 20 deletions.
6 changes: 5 additions & 1 deletion mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<K: Eq + Hash, V> ExpirableMap<K, V> {
/// If a value already exists for the given key, it will be updated and then
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
self.clear_expired_entries();
let entry = ExpirableEntry {
expires_at: Instant::now() + exp,
value: v,
Expand All @@ -60,7 +61,10 @@ impl<K: Eq + Hash, V> ExpirableMap<K, V> {

/// Removes a key-value pair from the map and returns the associated value if present.
#[inline]
pub fn remove(&mut self, k: &K) -> Option<V> { self.0.remove(k).map(|v| v.value) }
pub fn remove(&mut self, k: &K) -> Option<V> {
self.clear_expired_entries();
self.0.remove(k).map(|v| v.value)
}
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand Down
74 changes: 68 additions & 6 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ const TAKER_ORDER_TIMEOUT: u64 = 30;
const ORDER_MATCH_TIMEOUT: u64 = 30;
const ORDERBOOK_REQUESTING_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 2;
const MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE: usize = 1000;
const RECENTLY_CANCELLED_TIMEOUT: u64 = 120;
#[cfg(not(test))]
const TRIE_STATE_HISTORY_TIMEOUT: u64 = 14400;
#[cfg(test)]
Expand Down Expand Up @@ -331,6 +332,30 @@ async fn process_orders_keep_alive(
Ok(())
}

fn process_maker_order_created(
ctx: &MmArc,
from_pubkey: String,
created_msg: new_protocol::MakerOrderCreated,
) -> OrderbookP2PHandlerResult {
// Ignore the order if it was recently cancelled
{
let uuid = Uuid::from(created_msg.uuid);
let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed");
let orderbook = ordermatch_ctx.orderbook.lock();
if let Some(order_pubkey) = orderbook.recently_cancelled.get(&uuid) {
if order_pubkey == &from_pubkey {
warn!("Maker order {} was recently cancelled, ignoring", uuid);
return Ok(());
}
}
}

let order: OrderbookItem = (created_msg, from_pubkey).into();
insert_or_update_order(ctx, order);

Ok(())
}

fn process_maker_order_updated(
ctx: MmArc,
from_pubkey: String,
Expand All @@ -350,6 +375,25 @@ fn process_maker_order_updated(
Ok(())
}

fn process_maker_order_cancelled(
ctx: &MmArc,
from_pubkey: String,
cancelled_msg: new_protocol::MakerOrderCancelled,
) -> OrderbookP2PHandlerResult {
// Add the order to the recently cancelled list to ignore it
// if a new order with the same uuid is received within the RECENTLY_CANCELLED_TIMEOUT timeframe
{
let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed");
let mut orderbook = ordermatch_ctx.orderbook.lock();
orderbook
.recently_cancelled
.insert(cancelled_msg.uuid.into(), from_pubkey.clone());
}

delete_order(ctx, &from_pubkey, cancelled_msg.uuid.into());
Ok(())
}

// fn verify_pubkey_orderbook(orderbook: &GetOrderbookPubkeyItem) -> Result<(), String> {
// let keys: Vec<(_, _)> = orderbook
// .orders
Expand Down Expand Up @@ -548,9 +592,7 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay:
log::debug!("received ordermatch message {:?}", message);
match message {
new_protocol::OrdermatchMessage::MakerOrderCreated(created_msg) => {
let order: OrderbookItem = (created_msg, hex::encode(pubkey.to_bytes().as_slice())).into();
insert_or_update_order(&ctx, order);
Ok(())
process_maker_order_created(&ctx, pubkey.to_hex(), created_msg)
},
new_protocol::OrdermatchMessage::PubkeyKeepAlive(keep_alive) => {
process_orders_keep_alive(ctx, from_peer, pubkey.to_hex(), keep_alive, i_am_relay).await
Expand All @@ -576,8 +618,7 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay:
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => {
delete_order(&ctx, &pubkey.to_hex(), cancelled_msg.uuid.into());
Ok(())
process_maker_order_cancelled(&ctx, pubkey.to_hex(), cancelled_msg)
},
new_protocol::OrdermatchMessage::MakerOrderUpdated(updated_msg) => {
process_maker_order_updated(ctx, pubkey.to_hex(), updated_msg)
Expand Down Expand Up @@ -2477,7 +2518,6 @@ fn collect_orderbook_metrics(ctx: &MmArc, orderbook: &Orderbook) {
mm_gauge!(ctx.metrics, "orderbook.memory_db", memory_db_size as f64);
}

#[derive(Default)]
struct Orderbook {
/// A map from (base, rel).
ordered: HashMap<(String, String), BTreeSet<OrderedByPriceOrder>>,
Expand All @@ -2490,12 +2530,33 @@ struct Orderbook {
order_set: HashMap<Uuid, OrderbookItem>,
/// a map of orderbook states of known maker pubkeys
pubkeys_state: HashMap<String, OrderbookPubkeyState>,
/// The `TimeCache` of recently canceled orders, mapping `Uuid` to the maker pubkey as `String`,
/// used to avoid order recreation in case of out of order p2p messages.
/// Entries are kept for `RECENTLY_CANCELLED_TIMEOUT` seconds.
recently_cancelled: TimeCache<Uuid, String>,
topics_subscribed_to: HashMap<String, OrderbookRequestingState>,
/// MemoryDB instance to store Patricia Tries data
memory_db: MemoryDB<Blake2Hasher64>,
my_p2p_pubkeys: HashSet<String>,
}

impl Default for Orderbook {
fn default() -> Self {
Orderbook {
ordered: HashMap::default(),
pairs_existing_for_base: HashMap::default(),
pairs_existing_for_rel: HashMap::default(),
unordered: HashMap::default(),
order_set: HashMap::default(),
pubkeys_state: HashMap::default(),
recently_cancelled: TimeCache::new(Duration::from_secs(RECENTLY_CANCELLED_TIMEOUT)),
topics_subscribed_to: HashMap::default(),
memory_db: MemoryDB::default(),
my_p2p_pubkeys: HashSet::default(),
}
}
}

fn hashed_null_node<T: TrieConfiguration>() -> TrieHash<T> { <T::Codec as NodeCodecT>::hashed_null_node() }

impl Orderbook {
Expand Down Expand Up @@ -3314,6 +3375,7 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) {
// This checks that the order hasn't been removed by another process
if let Some(order_mutex) = removed_order_mutex {
let order = order_mutex.lock().await;
maker_order_cancelled_p2p_notify(ctx.clone(), &order);
delete_my_maker_order(
ctx.clone(),
order.clone(),
Expand Down
26 changes: 13 additions & 13 deletions mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,6 @@ fn test_cancel_order() {
None,
)
.unwrap();
thread::sleep(Duration::from_secs(2));
let (_bob_dump_log, _bob_dump_dashboard) = mm_bob.mm_dump();
log!("Bob log path: {}", mm_bob.log_path.display());
// Enable coins on Bob side. Print the replies in case we need the "address".
Expand All @@ -1469,7 +1468,7 @@ fn test_cancel_order() {
let setprice_json: Json = json::from_str(&rc.1).unwrap();
log!("{:?}", setprice_json);

let mm_alice = MarketMakerIt::start(
let mut mm_alice = MarketMakerIt::start(
json! ({
"gui": "nogui",
"netid": 9998,
Expand All @@ -1485,7 +1484,6 @@ fn test_cancel_order() {
None,
)
.unwrap();
thread::sleep(Duration::from_secs(2));

let (_alice_dump_log, _alice_dump_dashboard) = mm_alice.mm_dump();
log!("Alice log path: {}", mm_alice.log_path.display());
Expand Down Expand Up @@ -1529,9 +1527,10 @@ fn test_cancel_order() {
));
assert!(!order_path.exists());

let pause = 3;
log!("Waiting ({} seconds) for Bob to cancel the order…", pause);
thread::sleep(Duration::from_secs(pause));
block_on(mm_alice.wait_for_log(5., |log| {
log.contains(&format!("Maker order {} was recently cancelled, ignoring", uuid))
}))
.unwrap();

// Bob orderbook must show no orders
log!("Get RICK/MORTY orderbook on Bob side");
Expand Down Expand Up @@ -1564,6 +1563,9 @@ fn test_cancel_order() {
assert_eq!(alice_orderbook.asks.len(), 0, "Alice RICK/MORTY asks are not empty");
}

/// This also covers recently canceled orders implementation.
/// The cancellation message arrives to alice before the order creation message
/// as alice gets the order creation message from p2p cache using IHAVE / IWANT control messages.
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_cancel_all_orders() {
Expand Down Expand Up @@ -1613,7 +1615,7 @@ fn test_cancel_all_orders() {
let setprice_json: Json = json::from_str(&rc.1).unwrap();
log!("{:?}", setprice_json);

let mm_alice = MarketMakerIt::start(
let mut mm_alice = MarketMakerIt::start(
json! ({
"gui": "nogui",
"netid": 9998,
Expand All @@ -1639,9 +1641,6 @@ fn test_cancel_all_orders() {
block_on(enable_coins_rick_morty_electrum(&mm_alice))
);

log!("Give Alice 3 seconds to import the order…");
thread::sleep(Duration::from_secs(3));

log!("Get RICK/MORTY orderbook on Alice side");
let rc = block_on(mm_alice.rpc(&json! ({
"userpass": mm_alice.userpass,
Expand Down Expand Up @@ -1674,9 +1673,10 @@ fn test_cancel_all_orders() {
));
assert!(!order_path.exists());

let pause = 3;
log!("Waiting ({} seconds) for Bob to cancel the order…", pause);
thread::sleep(Duration::from_secs(pause));
block_on(mm_alice.wait_for_log(5., |log| {
log.contains(&format!("Maker order {} was recently cancelled, ignoring", uuid))
}))
.unwrap();

// Bob orderbook must show no orders
log!("Get RICK/MORTY orderbook on Bob side");
Expand Down

0 comments on commit 6ad60e9

Please sign in to comment.