diff --git a/backend/rust-gbt/Cargo.lock b/backend/rust-gbt/Cargo.lock index 0407bcc14..d8f1bd6ad 100644 --- a/backend/rust-gbt/Cargo.lock +++ b/backend/rust-gbt/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "cfg-if" version = "1.0.0" @@ -12,7 +24,26 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "gbt" version = "0.1.0" dependencies = [ + "bytes", "neon", + "once_cell", + "priority-queue", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown", ] [[package]] @@ -66,6 +97,22 @@ dependencies = [ "smallvec", ] +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "priority-queue" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff39edfcaec0d64e8d0da38564fad195d2d51b680940295fcc307366e101e61" +dependencies = [ + "autocfg", + "indexmap", +] + [[package]] name = "proc-macro2" version = "1.0.60" diff --git a/backend/rust-gbt/Cargo.toml b/backend/rust-gbt/Cargo.toml index abd55fbd8..87155c59d 100644 --- a/backend/rust-gbt/Cargo.toml +++ b/backend/rust-gbt/Cargo.toml @@ -14,6 +14,7 @@ crate-type = ["cdylib"] [dependencies] priority-queue = "1.3.2" bytes = "1.4.0" +once_cell = "1.18.0" [dependencies.neon] version = "0.10" diff --git a/backend/rust-gbt/src/gbt.rs b/backend/rust-gbt/src/gbt.rs index 98c0d9178..aecf2bff0 100644 --- a/backend/rust-gbt/src/gbt.rs +++ b/backend/rust-gbt/src/gbt.rs @@ -34,13 +34,7 @@ impl Ord for TxPriority { } } -pub fn gbt(mempool_array: Vec) -> (Vec>, Vec<(u32, f64)>, Vec>) { - let mut mempool: HashMap = HashMap::new(); - for transaction in mempool_array { - mempool.insert(transaction.uid, transaction); - } - - +pub fn gbt(mempool: &mut HashMap) -> (Vec>, Vec<(u32, f64)>, Vec>) { return make_block_templates(mempool); } @@ -49,13 +43,13 @@ pub fn gbt(mempool_array: Vec) -> (Vec>, Vec<(u32, f * (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp) * Ported from https://github.com/mempool/mempool/blob/master/backend/src/api/tx-selection-worker.ts */ -fn make_block_templates(mempool: HashMap) -> (Vec>, Vec<(u32, f64)>, Vec>) { +fn make_block_templates(mempool: &mut HashMap) -> (Vec>, Vec<(u32, f64)>, Vec>) { let mut audit_pool: HashMap = HashMap::new(); let mut mempool_array: VecDeque = VecDeque::new(); let mut cluster_array: Vec> = Vec::new(); // Initialize working structs - for (uid, tx) in &mempool { + for (uid, tx) in mempool { let audit_tx = AuditTransaction { uid: tx.uid, fee: tx.fee, diff --git a/backend/rust-gbt/src/lib.rs b/backend/rust-gbt/src/lib.rs index ff7d6c208..12583300a 100644 --- a/backend/rust-gbt/src/lib.rs +++ b/backend/rust-gbt/src/lib.rs @@ -1,11 +1,20 @@ use neon::{prelude::*, types::buffer::TypedArray}; +use std::collections::HashMap; +use std::ops::DerefMut; +use std::sync::Mutex; +use once_cell::sync::Lazy; mod gbt; mod thread_transaction; mod audit_transaction; -use thread_transaction::{ThreadTransaction}; +mod utils; +use thread_transaction::ThreadTransaction; -fn go(mut cx: FunctionContext) -> JsResult { +static THREAD_TRANSACTIONS: Lazy>> = Lazy::new(|| { + Mutex::new(HashMap::new()) +}); + +fn make(mut cx: FunctionContext) -> JsResult { let mempool_arg = cx.argument::(0)?.root(&mut cx).into_inner(&mut cx); let callback = cx.argument::(1)?.root(&mut cx); let channel = cx.channel(); @@ -13,8 +22,48 @@ fn go(mut cx: FunctionContext) -> JsResult { let buffer = mempool_arg.as_slice(&mut cx); let thread_transactions = ThreadTransaction::batch_from_buffer(buffer); + let mut map = THREAD_TRANSACTIONS.lock().unwrap(); + map.clear(); + for tx in thread_transactions { + map.insert(tx.uid, tx); + } + drop(map); + + run_in_thread(channel, callback); + + Ok(cx.undefined()) +} + +fn update(mut cx: FunctionContext) -> JsResult { + let new_txs_arg = cx.argument::(0)?.root(&mut cx).into_inner(&mut cx); + let remove_txs_arg = cx.argument::(1)?.root(&mut cx).into_inner(&mut cx); + let callback = cx.argument::(2)?.root(&mut cx); + let channel = cx.channel(); + + let mut map = THREAD_TRANSACTIONS.lock().unwrap(); + let new_tx_buffer = new_txs_arg.as_slice(&mut cx); + let thread_transactions = ThreadTransaction::batch_from_buffer(new_tx_buffer); + for tx in thread_transactions { + map.insert(tx.uid, tx); + } + + let remove_tx_buffer = remove_txs_arg.as_slice(&mut cx); + let remove_ids = utils::txids_from_buffer(remove_tx_buffer); + for txid in &remove_ids { + map.remove(txid); + } + drop(map); + + run_in_thread(channel, callback); + + Ok(cx.undefined()) +} + +fn run_in_thread(channel: Channel, callback: Root) { std::thread::spawn(move || { - let (blocks, rates, clusters) = gbt::gbt(thread_transactions); + let mut map = THREAD_TRANSACTIONS.lock().unwrap(); + let (blocks, rates, clusters) = gbt::gbt(map.deref_mut()); + drop(map); channel.send(move |mut cx| { let result = JsObject::new(&mut cx); @@ -64,12 +113,11 @@ fn go(mut cx: FunctionContext) -> JsResult { Ok(()) }); }); - - Ok(cx.undefined()) } #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { - cx.export_function("go", go)?; + cx.export_function("make", make)?; + cx.export_function("update", update)?; Ok(()) } diff --git a/backend/rust-gbt/src/thread_transaction.rs b/backend/rust-gbt/src/thread_transaction.rs index fa977c3e9..f429613a6 100644 --- a/backend/rust-gbt/src/thread_transaction.rs +++ b/backend/rust-gbt/src/thread_transaction.rs @@ -2,6 +2,7 @@ extern crate bytes; use std::io::Cursor; use bytes::buf::Buf; + pub struct ThreadTransaction { pub uid: u32, pub fee: u64, diff --git a/backend/rust-gbt/src/utils.rs b/backend/rust-gbt/src/utils.rs new file mode 100644 index 000000000..569b858e8 --- /dev/null +++ b/backend/rust-gbt/src/utils.rs @@ -0,0 +1,14 @@ +extern crate bytes; +use std::io::Cursor; +use bytes::buf::Buf; + +pub fn txids_from_buffer(buffer: &[u8]) -> Vec { + let mut txids: Vec = Vec::new(); + let mut cursor = Cursor::new(buffer); + let size = cursor.get_u32(); + for _ in 0..size { + txids.push(cursor.get_u32()); + } + + return txids; +} \ No newline at end of file diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 7f4490477..b82946a9e 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -11,6 +11,7 @@ class MempoolBlocks { private mempoolBlocks: MempoolBlockWithTransactions[] = []; private mempoolBlockDeltas: MempoolBlockDelta[] = []; private txSelectionWorker: Worker | null = null; + private rustInitialized: boolean = false; private nextUid: number = 1; private uidMap: Map = new Map(); // map short numerical uids to full txids @@ -284,7 +285,7 @@ class MempoolBlocks { const start = Date.now(); for (const tx of Object.values(added)) { - this.setUid(tx); + this.setUid(tx, true); } const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[]; // prepare a stripped down version of the mempool with only the minimum necessary data @@ -337,23 +338,49 @@ class MempoolBlocks { // serialize relevant mempool data into an ArrayBuffer // to reduce the overhead of passing this data to the rust thread - const mempoolBuffer = this.mempoolToArrayBuffer(newMempool); + const mempoolBuffer = this.mempoolToArrayBuffer(Object.values(newMempool), newMempool); // run the block construction algorithm in a separate thread, and wait for a result try { - const { blocks, rates, clusters } = this.convertNeonResultTxids(await new Promise((resolve) => { neonAddon.go(mempoolBuffer, resolve); })); + const { blocks, rates, clusters } = this.convertNeonResultTxids(await new Promise((resolve) => { neonAddon.make(mempoolBuffer, resolve); })); + this.rustInitialized = true; const processed = this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); logger.debug(`RUST makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); return processed; } catch (e) { + this.rustInitialized = false; logger.err('RUST makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } return this.mempoolBlocks; } public async $rustUpdateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[], saveResults: boolean = false): Promise { - await this.$rustMakeBlockTemplates(newMempool, saveResults); - return; + if (!this.rustInitialized) { + // need to reset the worker + await this.$rustMakeBlockTemplates(newMempool, saveResults); + return; + } + + const start = Date.now(); + + for (const tx of Object.values(added)) { + this.setUid(tx, true); + } + const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[]; + // serialize relevant mempool data into an ArrayBuffer + // to reduce the overhead of passing this data to the rust thread + const addedBuffer = this.mempoolToArrayBuffer(added, newMempool); + const removedBuffer = this.uidsToArrayBuffer(removedUids); + + // run the block construction algorithm in a separate thread, and wait for a result + try { + const { blocks, rates, clusters } = this.convertNeonResultTxids(await new Promise((resolve) => { neonAddon.update(addedBuffer, removedBuffer, resolve); })); + this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); + logger.debug(`RUST updateBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); + } catch (e) { + this.rustInitialized = false; + logger.err('RUST updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); + } } private processBlockTemplates(mempool, blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }, saveResults): MempoolBlockWithTransactions[] { @@ -487,12 +514,16 @@ class MempoolBlocks { this.nextUid = 1; } - private setUid(tx: MempoolTransactionExtended): number { - const uid = this.nextUid; - this.nextUid++; - this.uidMap.set(uid, tx.txid); - tx.uid = uid; - return uid; + private setUid(tx: MempoolTransactionExtended, skipSet = false): number { + if (tx.uid == null || !skipSet) { + const uid = this.nextUid; + this.nextUid++; + this.uidMap.set(uid, tx.txid); + tx.uid = uid; + return uid; + } else { + return tx.uid; + } } private getUid(tx: MempoolTransactionExtended): number | void { @@ -565,14 +596,14 @@ class MempoolBlocks { return { blocks: convertedBlocks, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; } - private mempoolToArrayBuffer(mempool: { [txid: string]: MempoolTransactionExtended }): ArrayBuffer { + private mempoolToArrayBuffer(txs: MempoolTransactionExtended[], mempool: { [txid: string]: MempoolTransactionExtended }): ArrayBuffer { let len = 4; const inputs: { [uid: number]: number[] } = {}; let validCount = 0; - for (const tx of Object.values(mempool)) { + for (const tx of txs) { if (tx.uid != null) { validCount++; - const txInputs = tx.vin.map(v => this.getUid(mempool[v.txid])).filter(uid => uid != null) as number[] + const txInputs = tx.vin.map(v => this.getUid(mempool[v.txid])).filter(uid => uid != null) as number[]; inputs[tx.uid] = txInputs; len += (10 + txInputs.length) * 4; } @@ -581,7 +612,7 @@ class MempoolBlocks { const view = new DataView(buf); view.setUint32(0, validCount, false); let offset = 4; - for (const tx of Object.values(mempool)) { + for (const tx of txs) { if (tx.uid != null) { view.setUint32(offset, tx.uid, false); view.setFloat64(offset + 4, tx.fee, false); @@ -599,6 +630,19 @@ class MempoolBlocks { } return buf; } + + private uidsToArrayBuffer(uids: number[]): ArrayBuffer { + let len = (uids.length + 1) * 4; + const buf = new ArrayBuffer(len); + const view = new DataView(buf); + view.setUint32(0, uids.length, false); + let offset = 4; + for (const uid of uids) { + view.setUint32(offset, uid, false); + offset += 4; + } + return buf; + } } export default new MempoolBlocks();