Make esplora call in parallel

This commit is contained in:
Riccardo Casatta 2020-11-03 22:09:32 +01:00
parent fe7ecd3dd2
commit 4c59809f8e
No known key found for this signature in database
GPG Key ID: FD986A969E450397

View File

@ -38,7 +38,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use futures::stream::{self, StreamExt, TryStreamExt}; use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
#[allow(unused_imports)] #[allow(unused_imports)]
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
@ -56,8 +56,11 @@ use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
use super::*; use super::*;
use crate::database::BatchDatabase; use crate::database::BatchDatabase;
use crate::error::Error; use crate::error::Error;
use crate::wallet::utils::ChunksIterator;
use crate::FeeRate; use crate::FeeRate;
const CONCURRENT: usize = 4;
#[derive(Debug)] #[derive(Debug)]
struct UrlClient { struct UrlClient {
url: String, url: String,
@ -301,10 +304,16 @@ impl ElectrumLikeSync for UrlClient {
scripts: I, scripts: I,
) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> { ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> {
let future = async { let future = async {
Ok(stream::iter(scripts) let mut results = vec![];
.then(|script| self._script_get_history(&script)) for chunk in ChunksIterator::new(scripts.into_iter(), CONCURRENT) {
.try_collect() let mut futs = FuturesOrdered::new();
.await?) for script in chunk {
futs.push(self._script_get_history(&script));
}
let partial_results: Vec<Vec<ELSGetHistoryRes>> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
}; };
await_or_block!(future) await_or_block!(future)
@ -315,10 +324,16 @@ impl ElectrumLikeSync for UrlClient {
txids: I, txids: I,
) -> Result<Vec<Transaction>, Error> { ) -> Result<Vec<Transaction>, Error> {
let future = async { let future = async {
Ok(stream::iter(txids) let mut results = vec![];
.then(|txid| self._get_tx_no_opt(&txid)) for chunk in ChunksIterator::new(txids.into_iter(), CONCURRENT) {
.try_collect() let mut futs = FuturesOrdered::new();
.await?) for txid in chunk {
futs.push(self._get_tx_no_opt(&txid));
}
let partial_results: Vec<Transaction> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
}; };
await_or_block!(future) await_or_block!(future)
@ -329,10 +344,16 @@ impl ElectrumLikeSync for UrlClient {
heights: I, heights: I,
) -> Result<Vec<BlockHeader>, Error> { ) -> Result<Vec<BlockHeader>, Error> {
let future = async { let future = async {
Ok(stream::iter(heights) let mut results = vec![];
.then(|h| self._get_header(h)) for chunk in ChunksIterator::new(heights.into_iter(), CONCURRENT) {
.try_collect() let mut futs = FuturesOrdered::new();
.await?) for height in chunk {
futs.push(self._get_header(height));
}
let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
}; };
await_or_block!(future) await_or_block!(future)