From 4c59809f8e38ce9e39aa4ff8b750ef531cc35b70 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 3 Nov 2020 22:09:32 +0100 Subject: [PATCH] Make esplora call in parallel --- src/blockchain/esplora.rs | 47 ++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/src/blockchain/esplora.rs b/src/blockchain/esplora.rs index e2da5b80..d236e28f 100644 --- a/src/blockchain/esplora.rs +++ b/src/blockchain/esplora.rs @@ -38,7 +38,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; -use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; #[allow(unused_imports)] use log::{debug, error, info, trace}; @@ -56,8 +56,11 @@ use self::utils::{ELSGetHistoryRes, ElectrumLikeSync}; use super::*; use crate::database::BatchDatabase; use crate::error::Error; +use crate::wallet::utils::ChunksIterator; use crate::FeeRate; +const CONCURRENT: usize = 4; + #[derive(Debug)] struct UrlClient { url: String, @@ -301,10 +304,16 @@ impl ElectrumLikeSync for UrlClient { scripts: I, ) -> Result>, Error> { let future = async { - Ok(stream::iter(scripts) - .then(|script| self._script_get_history(&script)) - .try_collect() - .await?) + let mut results = vec![]; + for chunk in ChunksIterator::new(scripts.into_iter(), CONCURRENT) { + let mut futs = FuturesOrdered::new(); + for script in chunk { + futs.push(self._script_get_history(&script)); + } + let partial_results: Vec> = futs.try_collect().await?; + results.extend(partial_results); + } + Ok(stream::iter(results).collect().await) }; await_or_block!(future) @@ -315,10 +324,16 @@ impl ElectrumLikeSync for UrlClient { txids: I, ) -> Result, Error> { let future = async { - Ok(stream::iter(txids) - .then(|txid| self._get_tx_no_opt(&txid)) - .try_collect() - .await?) + let mut results = vec![]; + for chunk in ChunksIterator::new(txids.into_iter(), CONCURRENT) { + let mut futs = FuturesOrdered::new(); + for txid in chunk { + futs.push(self._get_tx_no_opt(&txid)); + } + let partial_results: Vec = futs.try_collect().await?; + results.extend(partial_results); + } + Ok(stream::iter(results).collect().await) }; await_or_block!(future) @@ -329,10 +344,16 @@ impl ElectrumLikeSync for UrlClient { heights: I, ) -> Result, Error> { let future = async { - Ok(stream::iter(heights) - .then(|h| self._get_header(h)) - .try_collect() - .await?) + let mut results = vec![]; + for chunk in ChunksIterator::new(heights.into_iter(), CONCURRENT) { + let mut futs = FuturesOrdered::new(); + for height in chunk { + futs.push(self._get_header(height)); + } + let partial_results: Vec = futs.try_collect().await?; + results.extend(partial_results); + } + Ok(stream::iter(results).collect().await) }; await_or_block!(future)