From 90a1b34b0fe842c4fce79d3605f3d52c3d008593 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 8 Jan 2025 12:13:31 +0100 Subject: [PATCH] Introduce `EventQueueNotifierGuard` type Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks. --- lightning-liquidity/src/events.rs | 47 +++++++++++++++++++++++- lightning-liquidity/src/lsps0/client.rs | 2 + lightning-liquidity/src/lsps1/client.rs | 13 ++++++- lightning-liquidity/src/lsps2/client.rs | 4 ++ lightning-liquidity/src/lsps2/service.rs | 6 +++ 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/lightning-liquidity/src/events.rs b/lightning-liquidity/src/events.rs index 3db772deec8..873f9388d30 100644 --- a/lightning-liquidity/src/events.rs +++ b/lightning-liquidity/src/events.rs @@ -31,7 +31,7 @@ pub(crate) struct EventQueue { queue: Arc>>, waker: Arc>>, #[cfg(feature = "std")] - condvar: crate::sync::Condvar, + condvar: Arc, } impl EventQueue { @@ -40,7 +40,7 @@ impl EventQueue { let waker = Arc::new(Mutex::new(None)); #[cfg(feature = "std")] { - let condvar = crate::sync::Condvar::new(); + let condvar = Arc::new(crate::sync::Condvar::new()); Self { queue, waker, condvar } } #[cfg(not(feature = "std"))] @@ -98,6 +98,49 @@ impl EventQueue { pub fn get_and_clear_pending_events(&self) -> Vec { self.queue.lock().unwrap().split_off(0).into() } + + // Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped. + pub fn notifier(&self) -> EventQueueNotifierGuard { + #[cfg(feature = "std")] + { + EventQueueNotifierGuard { + queue: Arc::clone(&self.queue), + waker: Arc::clone(&self.waker), + condvar: Arc::clone(&self.condvar), + } + } + #[cfg(not(feature = "std"))] + { + EventQueueNotifierGuard { + queue: Arc::clone(&self.queue), + waker: Arc::clone(&self.waker), + } + } + } +} + +// A guard type that will notify about new events when dropped. +#[must_use] +pub(crate) struct EventQueueNotifierGuard { + queue: Arc>>, + waker: Arc>>, + #[cfg(feature = "std")] + condvar: Arc, +} + +impl Drop for EventQueueNotifierGuard { + fn drop(&mut self) { + let should_notify = !self.queue.lock().unwrap().is_empty(); + + if should_notify { + if let Some(waker) = self.waker.lock().unwrap().take() { + waker.wake(); + } + + #[cfg(feature = "std")] + self.condvar.notify_one(); + } + } } /// An event which you should probably take some action in response to. diff --git a/lightning-liquidity/src/lsps0/client.rs b/lightning-liquidity/src/lsps0/client.rs index 9e4eb33c439..5b846733360 100644 --- a/lightning-liquidity/src/lsps0/client.rs +++ b/lightning-liquidity/src/lsps0/client.rs @@ -61,6 +61,8 @@ where fn handle_response( &self, response: LSPS0Response, counterparty_node_id: &PublicKey, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + match response { LSPS0Response::ListProtocols(ListProtocolsResponse { protocols }) => { self.pending_events.enqueue(Event::LSPS0Client( diff --git a/lightning-liquidity/src/lsps1/client.rs b/lightning-liquidity/src/lsps1/client.rs index 7b91cd8bdcd..430569f768d 100644 --- a/lightning-liquidity/src/lsps1/client.rs +++ b/lightning-liquidity/src/lsps1/client.rs @@ -104,8 +104,9 @@ where fn handle_get_info_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse, ) -> Result<(), LightningError> { - let outer_state_lock = self.per_peer_state.write().unwrap(); + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.write().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { let mut peer_state_lock = inner_state_lock.lock().unwrap(); @@ -142,6 +143,8 @@ where fn handle_get_info_error( &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { @@ -213,6 +216,8 @@ where &self, request_id: RequestId, counterparty_node_id: &PublicKey, response: CreateOrderResponse, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { @@ -254,6 +259,8 @@ where fn handle_create_order_error( &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { @@ -326,6 +333,8 @@ where &self, request_id: RequestId, counterparty_node_id: &PublicKey, response: CreateOrderResponse, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { @@ -367,6 +376,8 @@ where fn handle_get_order_error( &self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { diff --git a/lightning-liquidity/src/lsps2/client.rs b/lightning-liquidity/src/lsps2/client.rs index 32288c4a54f..5ba54008f51 100644 --- a/lightning-liquidity/src/lsps2/client.rs +++ b/lightning-liquidity/src/lsps2/client.rs @@ -185,6 +185,8 @@ where fn handle_get_info_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { @@ -251,6 +253,8 @@ where fn handle_buy_response( &self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse, ) -> Result<(), LightningError> { + let _event_queue_notifier = self.pending_events.notifier(); + let outer_state_lock = self.per_peer_state.read().unwrap(); match outer_state_lock.get(counterparty_node_id) { Some(inner_state_lock) => { diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index e909ba177e9..dbaa14b8fdc 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -740,6 +740,8 @@ where &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64, payment_hash: PaymentHash, ) -> Result<(), APIError> { + let _event_queue_notifier = self.pending_events.notifier(); + let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap(); if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -1029,6 +1031,8 @@ where &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest, ) -> Result<(), LightningError> { let _msg_queue_notifier = self.pending_messages.notifier(); + let _event_queue_notifier = self.pending_events.notifier(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); let inner_state_lock = get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id); @@ -1056,6 +1060,8 @@ where &self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest, ) -> Result<(), LightningError> { let _msg_queue_notifier = self.pending_messages.notifier(); + let _event_queue_notifier = self.pending_events.notifier(); + if let Some(payment_size_msat) = params.payment_size_msat { if payment_size_msat < params.opening_fee_params.min_payment_size_msat { let response = LSPS2Response::BuyError(ResponseError {