Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed May 10, 2024
1 parent 455c067 commit e867d9a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 37 deletions.
62 changes: 36 additions & 26 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,24 @@ impl Endpoint {
params: Vec<serde_json::Value>,
timeout: Duration,
) -> Result<serde_json::Value, jsonrpsee::core::Error> {
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;

match tokio::time::timeout(timeout, client.request(method, params.clone())).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
self.health.on_error(&err);
Err(err)
match tokio::time::timeout(timeout, async {
self.connected().await;
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;
match client.request(method, params.clone()).await {
Ok(resp) => Ok(resp),
Err(err) => {
self.health.on_error(&err);
Err(err)
}
}
})
.await
{
Ok(res) => res,
Err(_) => {
tracing::error!("request timed out method: {method} params: {params:?}");
self.health.on_error(&jsonrpsee::core::Error::RequestTimeout);
Expand All @@ -161,23 +167,27 @@ impl Endpoint {
unsubscribe_method: &str,
timeout: Duration,
) -> Result<Subscription<serde_json::Value>, jsonrpsee::core::Error> {
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;

match tokio::time::timeout(
timeout,
client.subscribe(subscribe_method, params.clone(), unsubscribe_method),
)
match tokio::time::timeout(timeout, async {
self.connected().await;
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;
match client
.subscribe(subscribe_method, params.clone(), unsubscribe_method)
.await
{
Ok(resp) => Ok(resp),
Err(err) => {
self.health.on_error(&err);
Err(err)
}
}
})
.await
{
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
self.health.on_error(&err);
Err(err)
}
Ok(res) => res,
Err(_) => {
tracing::error!("subscribe timed out subscribe: {subscribe_method} params: {params:?}");
self.health.on_error(&jsonrpsee::core::Error::RequestTimeout);
Expand Down
2 changes: 0 additions & 2 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ impl Client {
}
};

selected_endpoint.connected().await;

let handle_message = |message: Message, endpoint: Arc<Endpoint>, rotation_notify: Arc<Notify>| {
let tx = message_tx_bg.clone();
let request_backoff_counter = request_backoff_counter.clone();
Expand Down
21 changes: 12 additions & 9 deletions src/extensions/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,23 @@ async fn concurrent_requests() {
let req2 = rx.recv().await.unwrap();
let req3 = rx.recv().await.unwrap();

req1.respond(JsonValue::from_str("1").unwrap());
req2.respond(JsonValue::from_str("2").unwrap());
req3.respond(JsonValue::from_str("3").unwrap());
let p1 = req1.params.clone();
let p2 = req2.params.clone();
let p3 = req3.params.clone();
req1.respond(p1);
req2.respond(p2);
req3.respond(p3);
});

let res1 = client.request("mock_rpc", vec![]);
let res2 = client.request("mock_rpc", vec![]);
let res3 = client.request("mock_rpc", vec![]);
let res1 = client.request("mock_rpc", vec![json!(1)]);
let res2 = client.request("mock_rpc", vec![json!(2)]);
let res3 = client.request("mock_rpc", vec![json!(3)]);

let res = tokio::join!(res1, res2, res3);

assert_eq!(res.0.unwrap().to_string(), "1");
assert_eq!(res.1.unwrap().to_string(), "2");
assert_eq!(res.2.unwrap().to_string(), "3");
assert_eq!(res.0.unwrap(), json!([1]));
assert_eq!(res.1.unwrap(), json!([2]));
assert_eq!(res.2.unwrap(), json!([3]));

handle.stop().unwrap();
task.await.unwrap();
Expand Down

0 comments on commit e867d9a

Please sign in to comment.