Skip to content

Commit 06e4dc7

Browse files
committed
impl correlated message handling for inbound messages
1 parent 27cf3fb commit 06e4dc7

File tree

5 files changed

+66
-96
lines changed

5 files changed

+66
-96
lines changed

mm2src/kdf_walletconnect/src/inbound_message.rs

+22-23
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ use crate::{error::WalletConnectError,
99
update::reply_session_update_request},
1010
WalletConnectCtxImpl};
1111

12-
use common::log::{info, LogOnError};
13-
use futures::{channel::mpsc::UnboundedSender, sink::SinkExt};
14-
use mm2_err_handle::prelude::{MmError, MmResult};
12+
use common::log::info;
13+
use mm2_err_handle::prelude::{MapToMmResult, MmError, MmResult};
1514
use relay_rpc::domain::{MessageId, Topic};
1615
use relay_rpc::rpc::{params::ResponseParamsSuccess, Params, Request, Response};
1716

18-
pub(crate) type SessionMessageType = MmResult<SessionMessage, String>;
17+
pub(crate) type SessionMessageType = MmResult<SessionMessage, WalletConnectError>;
1918

2019
#[derive(Debug)]
2120
pub struct SessionMessage {
@@ -61,31 +60,31 @@ pub(crate) async fn process_inbound_request(
6160
/// Processes an inbound WalletConnect response and sends the result to the provided message channel.
6261
///
6362
/// Handles successful responses, errors, and specific session proposal processing.
64-
pub(crate) async fn process_inbound_response(
65-
ctx: &WalletConnectCtxImpl,
66-
response: Response,
67-
topic: &Topic,
68-
mut message_tx: UnboundedSender<SessionMessageType>,
69-
) {
63+
pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtxImpl, response: Response, topic: &Topic) {
7064
let message_id = response.id();
71-
let result = match response {
72-
Response::Success(value) => match serde_json::from_value::<ResponseParamsSuccess>(value.result) {
73-
Ok(data) => {
74-
// TODO: maybe move to [send_proposal_request] and spawn in a different thread
75-
if let ResponseParamsSuccess::SessionPropose(propose) = &data {
76-
process_session_propose_response(ctx, topic, propose).await.error_log();
77-
}
78-
79-
Ok(SessionMessage {
65+
let result = match &response {
66+
Response::Success(value) => {
67+
let data = serde_json::from_value::<ResponseParamsSuccess>(value.result.clone());
68+
if let Ok(ResponseParamsSuccess::SessionPropose(propose)) = &data {
69+
let mut pending_requests = ctx.pending_requests.lock().await;
70+
pending_requests.remove(&message_id);
71+
let _ = process_session_propose_response(ctx, topic, propose).await;
72+
return;
73+
};
74+
data.map_to_mm(|err| WalletConnectError::SerdeError(err.to_string()))
75+
.map(|data| SessionMessage {
8076
message_id,
8177
topic: topic.clone(),
8278
data,
8379
})
84-
},
85-
Err(e) => MmError::err(e.to_string()),
8680
},
87-
Response::Error(err) => MmError::err(format!("{err:?}")),
81+
Response::Error(err) => MmError::err(WalletConnectError::UnSuccessfulResponse(format!("{err:?}"))),
8882
};
8983

90-
message_tx.send(result).await.error_log();
84+
let mut pending_requests = ctx.pending_requests.lock().await;
85+
if let Some(tx) = pending_requests.remove(&message_id) {
86+
tx.send(result).ok();
87+
} else {
88+
common::log::error!("[{topic}] unrecognized inbound response/message: {response:?}");
89+
};
9190
}

mm2src/kdf_walletconnect/src/lib.rs

+29-51
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use common::log::{debug, info, LogOnError};
1818
use common::{executor::SpawnFuture, log::error};
1919
use connection_handler::Handler;
2020
use error::WalletConnectError;
21-
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
21+
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
2222
use futures::lock::Mutex;
2323
use futures::StreamExt;
2424
use inbound_message::{process_inbound_request, process_inbound_response, SessionMessageType};
@@ -39,12 +39,12 @@ use serde::de::DeserializeOwned;
3939
use session::rpc::delete::send_session_delete_request;
4040
use session::Session;
4141
use session::{key::SymKeyPair, SessionManager};
42-
use std::collections::BTreeSet;
42+
use std::collections::{BTreeSet, HashMap};
4343
use std::ops::Deref;
4444
use std::{sync::Arc, time::Duration};
4545
use storage::SessionStorageDb;
4646
use storage::WalletConnectStorageOps;
47-
use tokio::time::timeout;
47+
use tokio::sync::oneshot;
4848
use wc_common::{decode_and_decrypt_type0, encrypt_and_encode, EnvelopeType, SymKey};
4949

5050
const PUBLISH_TIMEOUT_SECS: f64 = 6.;
@@ -79,7 +79,7 @@ pub struct WalletConnectCtxImpl {
7979
pub(crate) key_pair: SymKeyPair,
8080
relay: Relay,
8181
metadata: Metadata,
82-
message_rx: Mutex<UnboundedReceiver<SessionMessageType>>,
82+
pending_requests: Mutex<HashMap<MessageId, oneshot::Sender<SessionMessageType>>>,
8383
abortable_system: AbortableQueue,
8484
}
8585

@@ -103,7 +103,6 @@ impl WalletConnectCtx {
103103
};
104104
let (inbound_message_tx, mut inbound_message_rx) = unbounded();
105105
let (conn_live_sender, conn_live_receiver) = unbounded();
106-
let (message_tx, message_rx) = unbounded();
107106
let (client, _) = Client::new_with_callback(
108107
Handler::new("Komodefi", inbound_message_tx, conn_live_sender),
109108
|r, h| abortable_system.weak_spawner().spawn(client_event_loop(r, h)),
@@ -116,7 +115,7 @@ impl WalletConnectCtx {
116115
metadata: generate_metadata(),
117116
key_pair: SymKeyPair::new(),
118117
session_manager: SessionManager::new(storage),
119-
message_rx: message_rx.into(),
118+
pending_requests: Default::default(),
120119
abortable_system,
121120
});
122121

@@ -130,7 +129,7 @@ impl WalletConnectCtx {
130129
let context = context.clone();
131130
async move {
132131
while let Some(msg) = inbound_message_rx.next().await {
133-
if let Err(e) = context.handle_published_message(msg, message_tx.clone()).await {
132+
if let Err(e) = context.handle_published_message(msg).await {
134133
error!("Error processing message: {:?}", e);
135134
}
136135
}
@@ -272,11 +271,7 @@ impl WalletConnectCtxImpl {
272271
}
273272

274273
/// Handles an inbound published message by decrypting, decoding, and processing it.
275-
async fn handle_published_message(
276-
&self,
277-
msg: PublishedMessage,
278-
message_tx: UnboundedSender<SessionMessageType>,
279-
) -> MmResult<(), WalletConnectError> {
274+
async fn handle_published_message(&self, msg: PublishedMessage) -> MmResult<(), WalletConnectError> {
280275
let message = {
281276
let key = self.sym_key(&msg.topic)?;
282277
decode_and_decrypt_type0(msg.message.as_bytes(), &key)?
@@ -286,7 +281,7 @@ impl WalletConnectCtxImpl {
286281

287282
match serde_json::from_str(&message)? {
288283
Payload::Request(request) => process_inbound_request(self, request, &msg.topic).await?,
289-
Payload::Response(response) => process_inbound_response(self, response, &msg.topic, message_tx).await,
284+
Payload::Response(response) => process_inbound_response(self, response, &msg.topic).await,
290285
}
291286

292287
info!("[{}] Inbound message was handled successfully", msg.topic);
@@ -333,6 +328,7 @@ impl WalletConnectCtxImpl {
333328
.collect::<Vec<_>>();
334329

335330
if !all_topics.is_empty() {
331+
println!("SUBBING: {all_topics:?}");
336332
self.client.batch_subscribe(all_topics).await?;
337333
}
338334

@@ -344,13 +340,20 @@ impl WalletConnectCtxImpl {
344340
&self,
345341
topic: &Topic,
346342
param: RequestParams,
347-
message_id: MessageId,
348-
) -> MmResult<(), WalletConnectError> {
343+
) -> MmResult<(oneshot::Receiver<SessionMessageType>, Duration), WalletConnectError> {
349344
let irn_metadata = param.irn_metadata();
345+
let ttl = irn_metadata.ttl;
346+
let message_id = MessageIdGenerator::new().next();
350347
let request = Request::new(message_id, param.into());
351348

352349
self.publish_payload(topic, irn_metadata, Payload::Request(request))
353-
.await
350+
.await?;
351+
352+
let (tx, rx) = oneshot::channel();
353+
let mut pending_requests = self.pending_requests.lock().await;
354+
pending_requests.insert(message_id, tx);
355+
356+
Ok((rx, Duration::from_secs(ttl)))
354357
}
355358

356359
/// Private function to publish a success request response.
@@ -581,41 +584,16 @@ impl WalletConnectCtxImpl {
581584
},
582585
};
583586
let request = RequestParams::SessionRequest(request);
584-
let ttl = request.irn_metadata().ttl;
585-
let message_id = MessageIdGenerator::new().next();
586-
self.publish_request(&active_topic, request, message_id).await?;
587-
588-
match timeout(Duration::from_secs(ttl), async {
589-
// Check if the message exists and matches the expected message ID and
590-
// wait till we get a message with expected id or timeout.
591-
loop {
592-
let next_message = {
593-
let mut lock = self.message_rx.lock().await;
594-
lock.next().await
595-
};
596-
597-
if let Some(Ok(message)) = &next_message {
598-
if message.message_id == message_id {
599-
return next_message;
600-
}
601-
}
602-
}
603-
})
604-
.await
605-
{
606-
Ok(Some(result)) => {
607-
let result = result.mm_err(WalletConnectError::InternalError);
608-
match result?.data {
609-
ResponseParamsSuccess::Arbitrary(data) => {
610-
let data = serde_json::from_value::<T>(data)
611-
.map_err(|e| WalletConnectError::SerdeError(e.to_string()))?;
612-
callback(data)
613-
},
614-
_ => MmError::err(WalletConnectError::PayloadError("Unexpected response type".to_string())),
615-
}
616-
},
617-
Ok(None) => MmError::err(WalletConnectError::NoWalletFeedback),
618-
Err(_) => MmError::err(WalletConnectError::TimeoutError),
587+
let (rx, ttl) = self.publish_request(&active_topic, request).await?;
588+
589+
let maybe_response = rx
590+
.timeout(ttl)
591+
.await
592+
.map_to_mm(|_| WalletConnectError::TimeoutError)?
593+
.map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??;
594+
match maybe_response.data {
595+
ResponseParamsSuccess::Arbitrary(data) => callback(serde_json::from_value::<T>(data)?),
596+
_ => MmError::err(WalletConnectError::PayloadError("Unexpected response type".to_string())),
619597
}
620598
}
621599

mm2src/kdf_walletconnect/src/session/rpc/delete.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::{error::{WalletConnectError, USER_REQUESTED},
22
storage::WalletConnectStorageOps,
33
WalletConnectCtxImpl};
44

5-
use common::log::debug;
5+
use common::{custom_futures::timeout::FutureTimerExt, log::debug};
6+
use mm2_err_handle::map_to_mm::MapToMmResult;
67
use mm2_err_handle::prelude::{MapMmError, MmResult};
7-
use relay_client::MessageIdGenerator;
88
use relay_rpc::domain::{MessageId, Topic};
99
use relay_rpc::rpc::params::{session_delete::SessionDeleteRequest, RequestParams, ResponseParamsSuccess};
1010

@@ -29,9 +29,12 @@ pub(crate) async fn send_session_delete_request(
2929
message: "User Disconnected".to_owned(),
3030
};
3131
let param = RequestParams::SessionDelete(delete_request);
32-
let message_id = MessageIdGenerator::new().next();
32+
let (rx, ttl) = ctx.publish_request(session_topic, param).await?;
3333

34-
ctx.publish_request(session_topic, param, message_id).await?;
34+
rx.timeout(ttl)
35+
.await
36+
.map_to_mm(|_| WalletConnectError::TimeoutError)?
37+
.map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??;
3538

3639
session_delete_cleanup(ctx, session_topic).await
3740
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
use std::time::Duration;
2-
31
use crate::{error::WalletConnectError, WalletConnectCtxImpl};
42

53
use common::custom_futures::timeout::FutureTimerExt;
6-
use futures::StreamExt;
74
use mm2_err_handle::prelude::*;
8-
use relay_client::MessageIdGenerator;
95
use relay_rpc::{domain::{MessageId, Topic},
10-
rpc::params::{RelayProtocolMetadata, RequestParams, ResponseParamsSuccess}};
6+
rpc::params::{RequestParams, ResponseParamsSuccess}};
117

128
pub(crate) async fn reply_session_ping_request(
139
ctx: &WalletConnectCtxImpl,
@@ -22,15 +18,11 @@ pub(crate) async fn reply_session_ping_request(
2218

2319
pub async fn send_session_ping_request(ctx: &WalletConnectCtxImpl, topic: &Topic) -> MmResult<(), WalletConnectError> {
2420
let param = RequestParams::SessionPing(());
25-
let ttl = param.irn_metadata().ttl;
26-
let message_id = MessageIdGenerator::new().next();
27-
ctx.publish_request(topic, param, message_id).await?;
28-
29-
let wait_duration = Duration::from_secs(ttl);
30-
if let Ok(Some(resp)) = ctx.message_rx.lock().await.next().timeout(wait_duration).await {
31-
resp.mm_err(WalletConnectError::InternalError)?;
32-
return Ok(());
33-
}
21+
let (rx, ttl) = ctx.publish_request(topic, param).await?;
22+
rx.timeout(ttl)
23+
.await
24+
.map_to_mm(|_| WalletConnectError::TimeoutError)?
25+
.map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??;
3426

35-
MmError::err(WalletConnectError::PayloadError("Session Ping Error".to_owned()))
27+
Ok(())
3628
}

mm2src/kdf_walletconnect/src/session/rpc/propose.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::{error::WalletConnectError,
88
use chrono::Utc;
99
use mm2_err_handle::map_to_mm::MapToMmResult;
1010
use mm2_err_handle::prelude::*;
11-
use relay_client::MessageIdGenerator;
1211
use relay_rpc::rpc::params::session::ProposeNamespaces;
1312
use relay_rpc::{domain::{MessageId, Topic},
1413
rpc::params::{session_propose::{Proposer, SessionProposeRequest, SessionProposeResponse},
@@ -31,8 +30,7 @@ pub(crate) async fn send_proposal_request(
3130
required_namespaces,
3231
optional_namespaces,
3332
});
34-
let message_id = MessageIdGenerator::new().next();
35-
ctx.publish_request(topic, session_proposal, message_id).await?;
33+
let _ = ctx.publish_request(topic, session_proposal).await?;
3634

3735
Ok(())
3836
}

0 commit comments

Comments
 (0)