Skip to content

Commit 27cf3fb

Browse files
committed
track message via message_id
1 parent 1860776 commit 27cf3fb

File tree

6 files changed

+45
-16
lines changed

6 files changed

+45
-16
lines changed

mm2src/kdf_walletconnect/src/connection_handler.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub(crate) async fn handle_disconnections(
8282
loop {
8383
match this.reconnect_and_subscribe().await {
8484
Ok(_) => {
85-
error!("Reconnection process complete.");
85+
info!("Reconnection process complete.");
8686
backoff = 1;
8787
break;
8888
},

mm2src/kdf_walletconnect/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub enum WalletConnectError {
9090
InvalidChainId(String),
9191
#[error("ChainId not supported: {0}")]
9292
ChainIdNotSupported(String),
93+
#[error("Request timeout error")]
94+
TimeoutError,
9395
}
9496

9597
impl From<Error<PublishError>> for WalletConnectError {

mm2src/kdf_walletconnect/src/lib.rs

+33-12
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,9 @@ impl WalletConnectCtxImpl {
344344
&self,
345345
topic: &Topic,
346346
param: RequestParams,
347+
message_id: MessageId,
347348
) -> MmResult<(), WalletConnectError> {
348349
let irn_metadata = param.irn_metadata();
349-
let message_id = MessageIdGenerator::new().next();
350350
let request = Request::new(message_id, param.into());
351351

352352
self.publish_payload(topic, irn_metadata, Payload::Request(request))
@@ -463,6 +463,7 @@ impl WalletConnectCtxImpl {
463463
session: &Session,
464464
chain_id: &WcChainId,
465465
) -> MmResult<(), WalletConnectError> {
466+
println!("{:?}", session.namespaces.get(chain_id.chain.as_ref()));
466467
if let Some(Namespace {
467468
chains: Some(chains), ..
468469
}) = session.namespaces.get(chain_id.chain.as_ref())
@@ -581,21 +582,41 @@ impl WalletConnectCtxImpl {
581582
};
582583
let request = RequestParams::SessionRequest(request);
583584
let ttl = request.irn_metadata().ttl;
584-
self.publish_request(&active_topic, request).await?;
585-
586-
if let Ok(Some(resp)) = timeout(Duration::from_secs(ttl), async {
587-
self.message_rx.lock().await.next().await
585+
let message_id = MessageIdGenerator::new().next();
586+
self.publish_request(&active_topic, request, message_id).await?;
587+
588+
match timeout(Duration::from_secs(ttl), async {
589+
// Check if the message exists and matches the expected message ID and
590+
// wait till we get a message with expected id or timeout.
591+
loop {
592+
let next_message = {
593+
let mut lock = self.message_rx.lock().await;
594+
lock.next().await
595+
};
596+
597+
if let Some(Ok(message)) = &next_message {
598+
if message.message_id == message_id {
599+
return next_message;
600+
}
601+
}
602+
}
588603
})
589604
.await
590605
{
591-
let result = resp.mm_err(WalletConnectError::InternalError)?;
592-
if let ResponseParamsSuccess::Arbitrary(data) = result.data {
593-
let data = serde_json::from_value::<T>(data)?;
594-
return callback(data);
595-
}
606+
Ok(Some(result)) => {
607+
let result = result.mm_err(WalletConnectError::InternalError);
608+
match result?.data {
609+
ResponseParamsSuccess::Arbitrary(data) => {
610+
let data = serde_json::from_value::<T>(data)
611+
.map_err(|e| WalletConnectError::SerdeError(e.to_string()))?;
612+
callback(data)
613+
},
614+
_ => MmError::err(WalletConnectError::PayloadError("Unexpected response type".to_string())),
615+
}
616+
},
617+
Ok(None) => MmError::err(WalletConnectError::NoWalletFeedback),
618+
Err(_) => MmError::err(WalletConnectError::TimeoutError),
596619
}
597-
598-
MmError::err(WalletConnectError::NoWalletFeedback)
599620
}
600621

601622
pub async fn drop_session(&self, topic: &Topic) -> MmResult<(), WalletConnectError> {

mm2src/kdf_walletconnect/src/session/rpc/delete.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{error::{WalletConnectError, USER_REQUESTED},
44

55
use common::log::debug;
66
use mm2_err_handle::prelude::{MapMmError, MmResult};
7+
use relay_client::MessageIdGenerator;
78
use relay_rpc::domain::{MessageId, Topic};
89
use relay_rpc::rpc::params::{session_delete::SessionDeleteRequest, RequestParams, ResponseParamsSuccess};
910

@@ -28,8 +29,9 @@ pub(crate) async fn send_session_delete_request(
2829
message: "User Disconnected".to_owned(),
2930
};
3031
let param = RequestParams::SessionDelete(delete_request);
32+
let message_id = MessageIdGenerator::new().next();
3133

32-
ctx.publish_request(session_topic, param).await?;
34+
ctx.publish_request(session_topic, param, message_id).await?;
3335

3436
session_delete_cleanup(ctx, session_topic).await
3537
}

mm2src/kdf_walletconnect/src/session/rpc/ping.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{error::WalletConnectError, WalletConnectCtxImpl};
55
use common::custom_futures::timeout::FutureTimerExt;
66
use futures::StreamExt;
77
use mm2_err_handle::prelude::*;
8+
use relay_client::MessageIdGenerator;
89
use relay_rpc::{domain::{MessageId, Topic},
910
rpc::params::{RelayProtocolMetadata, RequestParams, ResponseParamsSuccess}};
1011

@@ -22,7 +23,8 @@ pub(crate) async fn reply_session_ping_request(
2223
pub async fn send_session_ping_request(ctx: &WalletConnectCtxImpl, topic: &Topic) -> MmResult<(), WalletConnectError> {
2324
let param = RequestParams::SessionPing(());
2425
let ttl = param.irn_metadata().ttl;
25-
ctx.publish_request(topic, param).await?;
26+
let message_id = MessageIdGenerator::new().next();
27+
ctx.publish_request(topic, param, message_id).await?;
2628

2729
let wait_duration = Duration::from_secs(ttl);
2830
if let Ok(Some(resp)) = ctx.message_rx.lock().await.next().timeout(wait_duration).await {

mm2src/kdf_walletconnect/src/session/rpc/propose.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{error::WalletConnectError,
88
use chrono::Utc;
99
use mm2_err_handle::map_to_mm::MapToMmResult;
1010
use mm2_err_handle::prelude::*;
11+
use relay_client::MessageIdGenerator;
1112
use relay_rpc::rpc::params::session::ProposeNamespaces;
1213
use relay_rpc::{domain::{MessageId, Topic},
1314
rpc::params::{session_propose::{Proposer, SessionProposeRequest, SessionProposeResponse},
@@ -30,7 +31,8 @@ pub(crate) async fn send_proposal_request(
3031
required_namespaces,
3132
optional_namespaces,
3233
});
33-
ctx.publish_request(topic, session_proposal).await?;
34+
let message_id = MessageIdGenerator::new().next();
35+
ctx.publish_request(topic, session_proposal, message_id).await?;
3436

3537
Ok(())
3638
}

0 commit comments

Comments
 (0)