From 93d79d68f030ff98df6b92e0331b05e8d1b1556d Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 10 Sep 2024 15:54:10 +0300 Subject: [PATCH] properly respect the max_connected threshold let the background thread of the connection manager respect the `max_connected` threshold (not entirely really, we can reach up to `max_connected + 1`). This is done by refactoring the connection establishment to not establish all connections but only connections that we know we will maintain for sure. This also is lighter overall than the old approach. but it's also slower since connections are tried sequentially, we don't need the speed here anyways since if an electrum request fails we fail back to try all connections which is a logic that isn't controlled by the connection manager in any way. --- .../utxo/rpc_clients/electrum_rpc/client.rs | 13 ++- .../connection_manager/manager.rs | 83 +++++++++---------- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index e4bf768e0af..cda3dfa3715 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -307,7 +307,9 @@ impl ElectrumClient { let request = (req_id, request); // Use the active connections for this request. let connections = self.connection_manager.get_active_connections().await; - let concurrency = connections.len() as u32; + // Maximum number of connections to establish or use in request concurrently. + // Could be up to connections.len(). + let concurrency = 1; match self .send_request_using(&request, connections, send_to_all, concurrency) .await @@ -316,7 +318,14 @@ impl ElectrumClient { // If we failed the request using only the active connections, try again using all connections. Err(_) => { let connections = self.connection_manager.get_all_connections(); - match self.send_request_using(&request, connections, send_to_all, 1).await { + // The concurrency here must be `1`, because we are trying out connections that aren't maintained + // which means we might break the max connections rule. + // We will at most we will break this rule by `1` (have `max_connected + 1` open connections). + let concurrency = 1; + match self + .send_request_using(&request, connections, send_to_all, concurrency) + .await + { Ok(response) => Ok(response), Err(err_vec) => Err(JsonRpcErrorType::Internal(format!("All servers errored: {err_vec:?}"))), } diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index 1de26f06954..271daad3319 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, HashMap}; -use std::iter::FromIterator; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; use super::super::client::{ElectrumClient, ElectrumClientImpl}; @@ -12,11 +11,10 @@ use common::executor::abortable_queue::AbortableQueue; use common::executor::{AbortableSystem, SpawnFuture, Timer}; use common::notifier::{Notifiee, Notifier}; use common::now_ms; -use futures::stream::FuturesUnordered; use keys::Address; use futures::compat::Future01CompatExt; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; /// A macro to unwrap an option and *execute* some code if the option is None. macro_rules! unwrap_or_else { @@ -343,7 +341,7 @@ impl ConnectionManager { let mut min_connected_notification = unwrap_or_return!(self.extract_below_min_connected_notifiee()); loop { // Get the candidate connections that we will consider maintaining. - let (will_never_get_min_connected, candidate_connections, currently_connected) = { + let (will_never_get_min_connected, mut candidate_connections) = { let all_connections = self.read_connections(); let maintained_connections = self.read_maintained_connections(); let currently_connected = maintained_connections.len() as u32; @@ -353,13 +351,14 @@ impl ConnectionManager { let all_candidate_connections: Vec<_> = all_connections .iter() .filter_map(|(_, conn_ctx)| { - (!maintained_connections.contains_key(&conn_ctx.id)).then(|| conn_ctx.connection.clone()) + (!maintained_connections.contains_key(&conn_ctx.id)) + .then(|| (conn_ctx.connection.clone(), conn_ctx.id)) }) .collect(); // The candidate connections from above, but further filtered by whether they are suspended or not. let non_suspended_candidate_connections: Vec<_> = all_candidate_connections .iter() - .filter(|connection| { + .filter(|(connection, _)| { all_connections .get(connection.address()) .map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till()) @@ -371,62 +370,62 @@ impl ConnectionManager { if connections_needed > all_candidate_connections.len() as u32 { // Not enough connections to cover the `min_connected` threshold. // This means we will never be able to maintain `min_connected` active connections. - (true, all_candidate_connections, currently_connected) + (true, all_candidate_connections) } else { // If we consider all candidate connection (but some are suspended), we can cover the needed connections. // We will consider the suspended ones since if we don't we will stay below `min_connected` threshold. - (false, all_candidate_connections, currently_connected) + (false, all_candidate_connections) } } else { // Non suspended candidates are enough to cover the needed connections. - (false, non_suspended_candidate_connections, currently_connected) + (false, non_suspended_candidate_connections) } }; // Establish the connections to the selected candidates and alter the maintained connections set accordingly. { let client = unwrap_or_return!(self.get_client()); - // This is the maximum connections we can have being established concurrently. - let allowed_concurrency = - self.config().max_connected.saturating_sub(currently_connected).max(1) as usize; - let connection_loop_chunks = candidate_connections.chunks(allowed_concurrency).map(|chunk| { - FuturesUnordered::from_iter(chunk.iter().map(|connection| { - let address = connection.address().to_string(); - connection - .establish_connection_loop(client.clone()) - .map(|res| (address, res)) - })) - }); + // Sort the candidate connections by their priority/ID. + candidate_connections.sort_by_key(|(_, priority)| *priority); + // Pick each chunk of connections and establish them concurrently. - for mut connection_loops in connection_loop_chunks { - while let Some((address, result)) = connection_loops.next().await { - if let Err(e) = result { + for (connection, _) in candidate_connections { + let address = connection.address().to_string(); + let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id; + let (maintained_connections_size, lowest_priority_connection_id) = { + let maintained_connections = self.read_maintained_connections(); + let maintained_connections_size = maintained_connections.len() as u32; + let lowest_priority_connection_id = + *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); + (maintained_connections_size, lowest_priority_connection_id) + }; + + // We can only try to add the connection if: + // 1- We haven't reached the `max_connected` threshold. + // 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection. + if maintained_connections_size < self.config().max_connected + || connection_id < lowest_priority_connection_id + { + // Now that we know the connection is good to be inserted, try to establish it. + if let Err(e) = connection.establish_connection_loop(client.clone()).await { // Remove the connection if it's not recoverable. if !e.is_recoverable() { self.remove_connection(&address).await.ok(); } continue; } - let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id; - let maintained_connections = self.read_maintained_connections(); - let maintained_connections_size = maintained_connections.len() as u32; - let lowest_priority_connection_id = - *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); - // NOTE: Must drop to avoid deadlock with the write lock below. - drop(maintained_connections); - // We don't write-lock the maintained connections unless we know we will add this connection. - // That is, we can add it because we didn't hit the `max_connected` threshold, - // or we can add it because it is of a higher priority than the lowest priority connection. - if maintained_connections_size < self.config().max_connected - || connection_id < lowest_priority_connection_id - { - let mut maintained_connections = self.write_maintained_connections(); - maintained_connections.insert(connection_id, address); - // If we have reached the `max_connected` threshold then remove the lowest priority connection. - if !maintained_connections_size < self.config().max_connected { - maintained_connections.remove(&lowest_priority_connection_id); - } + let mut maintained_connections = self.write_maintained_connections(); + maintained_connections.insert(connection_id, address); + // If we have reached the `max_connected` threshold then remove the lowest priority connection. + if !maintained_connections_size < self.config().max_connected { + maintained_connections.remove(&lowest_priority_connection_id); } + } else { + // If any of the two conditions on the `if` statement above are not met, there is nothing to do. + // At this point we have already collected `max_connected` connections and also the current connection + // in the candidate list has a lower priority than the lowest priority maintained connection, and the next + // candidate connections as well since they are sorted by priority. + break; } } }