diff --git a/src/daemon.rs b/src/daemon.rs index 90e38a931..81e2ce783 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -8,7 +8,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use hex::FromHex; -use itertools::Itertools; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -377,31 +377,16 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); - let chunks = params_list - .iter() - .map(|params| json!({"method": method, "params": params, "id": id})) - .chunks(50_000); // Max Amount of batched requests - let mut results = vec![]; - for chunk in &chunks { - let reqs = chunk.collect(); - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) - } - } else { - bail!("non-array replies: {:?}", replies); - } - } - - Ok(results) + let req = json!({"method": method, "params": params, "id": id}); + let reply = self.call_jsonrpc(method, &req)?; + parse_jsonrpc_reply(reply, method, id) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request(&self, method: &str, params: &Value) -> Result { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request(method, ¶ms) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -415,13 +400,16 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; - assert_eq!(values.len(), 1); - Ok(values.remove(0)) + self.retry_request(method, ¶ms) } fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + // Send in parallel as individual JSONRPC requests, with no batching. + // See https://github.com/Blockstream/electrs/pull/33 + params_list + .par_iter() + .map(|params| self.retry_request(method, params)) + .collect() } // bitcoind JSONRPC API: