Skip to content

Commit

Permalink
Introduce EventQueueNotifierGuard type
Browse files Browse the repository at this point in the history
Previously, when enqueuing new events to the `EventQueue`, we'd
directly attempt to wake any notifiers/notify any threads waiting on
the `Condvar` about the newly available events. This could of course
mean we'd notify them while ourselves still holding some locks, e.g., on
the peer state.

Here, we instead introduce a `EventQueueNotifierGuard` type that will
notify about pending events if necesssary, which mitigates any potential
lock contention: we now simply have to ensure that any method calling
`enqueue` holds the notifier before retrieving any locks.
  • Loading branch information
tnull committed Jan 20, 2025
1 parent 4f15f5b commit 90a1b34
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 3 deletions.
47 changes: 45 additions & 2 deletions lightning-liquidity/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) struct EventQueue {
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: crate::sync::Condvar,
condvar: Arc<crate::sync::Condvar>,
}

impl EventQueue {
Expand All @@ -40,7 +40,7 @@ impl EventQueue {
let waker = Arc::new(Mutex::new(None));
#[cfg(feature = "std")]
{
let condvar = crate::sync::Condvar::new();
let condvar = Arc::new(crate::sync::Condvar::new());
Self { queue, waker, condvar }
}
#[cfg(not(feature = "std"))]
Expand Down Expand Up @@ -98,6 +98,49 @@ impl EventQueue {
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
self.queue.lock().unwrap().split_off(0).into()
}

// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
pub fn notifier(&self) -> EventQueueNotifierGuard {
#[cfg(feature = "std")]
{
EventQueueNotifierGuard {
queue: Arc::clone(&self.queue),
waker: Arc::clone(&self.waker),
condvar: Arc::clone(&self.condvar),
}
}
#[cfg(not(feature = "std"))]
{
EventQueueNotifierGuard {
queue: Arc::clone(&self.queue),
waker: Arc::clone(&self.waker),
}
}
}
}

// A guard type that will notify about new events when dropped.
#[must_use]
pub(crate) struct EventQueueNotifierGuard {
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: Arc<crate::sync::Condvar>,
}

impl Drop for EventQueueNotifierGuard {
fn drop(&mut self) {
let should_notify = !self.queue.lock().unwrap().is_empty();

if should_notify {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}

#[cfg(feature = "std")]
self.condvar.notify_one();
}
}
}

/// An event which you should probably take some action in response to.
Expand Down
2 changes: 2 additions & 0 deletions lightning-liquidity/src/lsps0/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ where
fn handle_response(
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

match response {
LSPS0Response::ListProtocols(ListProtocolsResponse { protocols }) => {
self.pending_events.enqueue(Event::LSPS0Client(
Expand Down
13 changes: 12 additions & 1 deletion lightning-liquidity/src/lsps1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ where
fn handle_get_info_response(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
) -> Result<(), LightningError> {
let outer_state_lock = self.per_peer_state.write().unwrap();
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.write().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
Expand Down Expand Up @@ -142,6 +143,8 @@ where
fn handle_get_info_error(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -213,6 +216,8 @@ where
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
response: CreateOrderResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -254,6 +259,8 @@ where
fn handle_create_order_error(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -326,6 +333,8 @@ where
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
response: CreateOrderResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -367,6 +376,8 @@ where
fn handle_get_order_error(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down
4 changes: 4 additions & 0 deletions lightning-liquidity/src/lsps2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ where
fn handle_get_info_response(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -251,6 +253,8 @@ where
fn handle_buy_response(
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down
6 changes: 6 additions & 0 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,8 @@ where
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
payment_hash: PaymentHash,
) -> Result<(), APIError> {
let _event_queue_notifier = self.pending_events.notifier();

let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
let outer_state_lock = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -1029,6 +1031,8 @@ where
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest,
) -> Result<(), LightningError> {
let _msg_queue_notifier = self.pending_messages.notifier();
let _event_queue_notifier = self.pending_events.notifier();

let mut outer_state_lock = self.per_peer_state.write().unwrap();
let inner_state_lock =
get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
Expand Down Expand Up @@ -1056,6 +1060,8 @@ where
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest,
) -> Result<(), LightningError> {
let _msg_queue_notifier = self.pending_messages.notifier();
let _event_queue_notifier = self.pending_events.notifier();

if let Some(payment_size_msat) = params.payment_size_msat {
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
let response = LSPS2Response::BuyError(ResponseError {
Expand Down

0 comments on commit 90a1b34

Please sign in to comment.