Skip to content

Commit

Permalink
migrate to timed-map entirely
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Jan 9, 2025
1 parent 1908a2e commit ff4038d
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 612 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ mm2_db = { path = "../mm2_db" }
mm2_metamask = { path = "../mm2_metamask" }
mm2_test_helpers = { path = "../mm2_test_helpers" }
time = { version = "0.3.20", features = ["wasm-bindgen"] }
timed-map = { version = "1.2", features = ["rustc-hash", "wasm"] }
tonic = { version = "0.10", default-features = false, features = ["prost", "codegen", "gzip"] }
tower-service = "0.3"
wasm-bindgen = "0.2.86"
Expand All @@ -148,6 +149,7 @@ lightning-net-tokio = "0.0.113"
rust-ini = { version = "0.13" }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
secp256k1v24 = { version = "0.24", package = "secp256k1" }
timed-map = { version = "1.2", features = ["rustc-hash"] }
tokio = { version = "1.20" }
tokio-rustls = { version = "0.24" }
tonic = { version = "0.10", features = ["tls", "tls-webpki-roots", "gzip"] }
Expand Down
10 changes: 5 additions & 5 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::eth::web3_transport::Web3SendOut;
use crate::eth::{EthCoin, RpcTransportEventHandlerShared};
use crate::{MmCoin, RpcTransportEventHandler};
use common::executor::{AbortSettings, SpawnAbortable, Timer};
use common::expirable_map::ExpirableMap;
use common::log;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
Expand All @@ -25,6 +24,7 @@ use proxy_signature::{ProxySign, RawMessage};
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::{AtomicUsize, Ordering},
Arc};
use timed_map::{StdClock, TimedMap};
use tokio_tungstenite_wasm::WebSocketStream;
use web3::error::{Error, TransportError};
use web3::helpers::to_string;
Expand Down Expand Up @@ -136,15 +136,15 @@ impl WebsocketTransport {
&self,
request: Option<ControllerMessage>,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match request {
Some(ControllerMessage::Request(WsRequest {
request_id,
serialized_request,
response_notifier,
})) => {
response_notifiers.insert(
response_notifiers.insert_expirable_unchecked(
request_id,
response_notifier,
// Since request will be cancelled when timeout occurs, we are free to drop its state.
Expand Down Expand Up @@ -187,7 +187,7 @@ impl WebsocketTransport {
async fn handle_response(
&self,
message: Option<Result<tokio_tungstenite_wasm::Message, tokio_tungstenite_wasm::Error>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match message {
Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => {
Expand Down Expand Up @@ -248,7 +248,7 @@ impl WebsocketTransport {
let _guard = self.connection_guard.lock().await;

// List of awaiting requests
let mut response_notifiers: ExpirableMap<RequestId, oneshot::Sender<Vec<u8>>> = ExpirableMap::default();
let mut response_notifiers: TimedMap<StdClock, RequestId, oneshot::Sender<Vec<u8>>> = TimedMap::default();

let mut wsocket = match self
.attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION)
Expand Down
8 changes: 4 additions & 4 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use crate::{RpcTransportEventHandler, SharableRpcTransportEventHandler};
use common::custom_futures::timeout::FutureTimerExt;
use common::executor::{abortable_queue::AbortableQueue, abortable_queue::WeakSpawner, AbortableSystem, SpawnFuture,
Timer};
use common::expirable_map::ExpirableMap;
use common::jsonrpc_client::{JsonRpcBatchResponse, JsonRpcErrorType, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
JsonRpcResponseEnum};
use common::log::{error, info};
use common::{now_float, now_ms};
use mm2_rpc::data::legacy::ElectrumProtocol;
use timed_map::{MapKind, StdClock, TimedMap};

use std::io;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
Expand Down Expand Up @@ -48,7 +48,7 @@ cfg_wasm32! {
use std::sync::atomic::AtomicUsize;
}

pub type JsonRpcPendingRequests = ExpirableMap<JsonRpcId, async_oneshot::Sender<JsonRpcResponseEnum>>;
pub type JsonRpcPendingRequests = TimedMap<StdClock, JsonRpcId, async_oneshot::Sender<JsonRpcResponseEnum>>;

macro_rules! disconnect_and_return {
($typ:tt, $err:expr, $conn:expr, $handlers:expr) => {{
Expand Down Expand Up @@ -177,7 +177,7 @@ impl ElectrumConnection {
settings,
tx: Mutex::new(None),
establishing_connection: AsyncMutex::new(()),
responses: Mutex::new(JsonRpcPendingRequests::new()),
responses: Mutex::new(JsonRpcPendingRequests::new_with_map_kind(MapKind::BTreeMap)),
protocol_version: Mutex::new(None),
last_error: Mutex::new(None),
abortable_system,
Expand Down Expand Up @@ -251,7 +251,7 @@ impl ElectrumConnection {
self.responses
.lock()
.unwrap()
.insert(rpc_id, req_tx, Duration::from_secs_f64(timeout));
.insert_expirable_unchecked(rpc_id, req_tx, Duration::from_secs_f64(timeout));
let tx = self
.tx
.lock()
Expand Down
2 changes: 0 additions & 2 deletions mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ pub mod crash_reports;
pub mod custom_futures;
pub mod custom_iter;
#[path = "executor/mod.rs"] pub mod executor;
pub mod expirable_map;
pub mod notifier;
pub mod number_type_casting;
pub mod password_policy;
pub mod seri;
pub mod time_cache;

#[cfg(not(target_arch = "wasm32"))]
#[path = "wio.rs"]
Expand Down
169 changes: 0 additions & 169 deletions mm2src/common/expirable_map.rs

This file was deleted.

Loading

0 comments on commit ff4038d

Please sign in to comment.