Pass gbt mempool data directly without serialization

This commit is contained in:
Mononaut 2023-06-28 16:31:59 -04:00
parent 5065fa42d0
commit 0886e850f9
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
8 changed files with 70 additions and 131 deletions

View File

@ -3,6 +3,14 @@
/* auto-generated by NAPI-RS */ /* auto-generated by NAPI-RS */
export interface ThreadTransaction {
uid: number
fee: number
weight: number
sigops: number
effectiveFeePerVsize: number
inputs: Array<number>
}
export class GbtGenerator { export class GbtGenerator {
constructor() constructor()
/** /**
@ -10,13 +18,13 @@ export class GbtGenerator {
* *
* Rejects if the thread panics or if the Mutex is poisoned. * Rejects if the thread panics or if the Mutex is poisoned.
*/ */
make(mempoolBuffer: Uint8Array): Promise<GbtResult> make(mempool: Array<ThreadTransaction>): Promise<GbtResult>
/** /**
* # Errors * # Errors
* *
* Rejects if the thread panics or if the Mutex is poisoned. * Rejects if the thread panics or if the Mutex is poisoned.
*/ */
update(newTxs: Uint8Array, removeTxs: Uint8Array): Promise<GbtResult> update(newTxs: Array<ThreadTransaction>, removeTxs: Array<number>): Promise<GbtResult>
} }
/** /**
* The result from calling the gbt function. * The result from calling the gbt function.

View File

@ -1,6 +1,5 @@
use crate::{ use crate::{
thread_transaction::ThreadTransaction, u32_hasher_types::{u32hashset_new, U32HasherState}, ThreadTransaction,
u32_hasher_types::{u32hashset_new, U32HasherState},
}; };
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
@ -16,7 +15,6 @@ pub struct AuditTransaction {
pub weight: u32, pub weight: u32,
pub sigop_adjusted_vsize: u32, pub sigop_adjusted_vsize: u32,
pub sigops: u32, pub sigops: u32,
pub fee_per_vsize: f64,
pub effective_fee_per_vsize: f64, pub effective_fee_per_vsize: f64,
pub dependency_rate: f64, pub dependency_rate: f64,
pub inputs: Vec<u32>, pub inputs: Vec<u32>,
@ -81,18 +79,17 @@ impl AuditTransaction {
let sigop_adjusted_vsize = ((tx.weight + 3) / 4).max(tx.sigops * 5); let sigop_adjusted_vsize = ((tx.weight + 3) / 4).max(tx.sigops * 5);
Self { Self {
uid: tx.uid, uid: tx.uid,
fee: tx.fee, fee: tx.fee as u64,
weight: tx.weight, weight: tx.weight,
sigop_adjusted_vsize, sigop_adjusted_vsize,
sigops: tx.sigops, sigops: tx.sigops,
fee_per_vsize: tx.fee_per_vsize,
effective_fee_per_vsize: tx.effective_fee_per_vsize, effective_fee_per_vsize: tx.effective_fee_per_vsize,
dependency_rate: f64::INFINITY, dependency_rate: f64::INFINITY,
inputs: tx.inputs.clone(), inputs: tx.inputs.clone(),
relatives_set_flag: false, relatives_set_flag: false,
ancestors: u32hashset_new(), ancestors: u32hashset_new(),
children: u32hashset_new(), children: u32hashset_new(),
ancestor_fee: tx.fee, ancestor_fee: tx.fee as u64,
ancestor_weight: tx.weight, ancestor_weight: tx.weight,
ancestor_sigop_adjusted_vsize: sigop_adjusted_vsize, ancestor_sigop_adjusted_vsize: sigop_adjusted_vsize,
ancestor_sigops: tx.sigops, ancestor_sigops: tx.sigops,

View File

@ -6,8 +6,9 @@
#![allow(clippy::cast_sign_loss)] #![allow(clippy::cast_sign_loss)]
#![allow(clippy::float_cmp)] #![allow(clippy::float_cmp)]
use napi::bindgen_prelude::{Result, Uint8Array}; use napi::bindgen_prelude::{Result};
use napi_derive::napi; use napi_derive::napi;
use thread_transaction::ThreadTransaction;
use tracing::{debug, info, trace}; use tracing::{debug, info, trace};
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::{EnvFilter, FmtSubscriber}; use tracing_subscriber::{EnvFilter, FmtSubscriber};
@ -19,9 +20,7 @@ mod audit_transaction;
mod gbt; mod gbt;
mod thread_transaction; mod thread_transaction;
mod u32_hasher_types; mod u32_hasher_types;
mod utils;
use thread_transaction::ThreadTransaction;
use u32_hasher_types::{u32hashmap_with_capacity, U32HasherState}; use u32_hasher_types::{u32hashmap_with_capacity, U32HasherState};
/// This is the starting capacity for HashMap/Vec/etc. that deal with transactions. /// This is the starting capacity for HashMap/Vec/etc. that deal with transactions.
@ -78,10 +77,10 @@ impl GbtGenerator {
/// ///
/// Rejects if the thread panics or if the Mutex is poisoned. /// Rejects if the thread panics or if the Mutex is poisoned.
#[napi] #[napi]
pub async fn make(&self, mempool_buffer: Uint8Array) -> Result<GbtResult> { pub async fn make(&self, mempool: Vec<ThreadTransaction>) -> Result<GbtResult> {
trace!("make: Current State {:#?}", self.thread_transactions); trace!("make: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| { run_task(Arc::clone(&self.thread_transactions), move |map| {
for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) { for tx in mempool {
map.insert(tx.uid, tx); map.insert(tx.uid, tx);
} }
}) })
@ -92,13 +91,13 @@ impl GbtGenerator {
/// ///
/// Rejects if the thread panics or if the Mutex is poisoned. /// Rejects if the thread panics or if the Mutex is poisoned.
#[napi] #[napi]
pub async fn update(&self, new_txs: Uint8Array, remove_txs: Uint8Array) -> Result<GbtResult> { pub async fn update(&self, new_txs: Vec<ThreadTransaction>, remove_txs: Vec<u32>) -> Result<GbtResult> {
trace!("update: Current State {:#?}", self.thread_transactions); trace!("update: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| { run_task(Arc::clone(&self.thread_transactions), move |map| {
for tx in ThreadTransaction::batch_from_buffer(&new_txs) { for tx in new_txs {
map.insert(tx.uid, tx); map.insert(tx.uid, tx);
} }
for txid in &utils::txids_from_buffer(&remove_txs) { for txid in &remove_txs {
map.remove(txid); map.remove(txid);
} }
}) })

View File

@ -1,45 +1,12 @@
use bytes::buf::Buf; use napi_derive::napi;
use std::io::Cursor;
#[derive(Debug)] #[derive(Debug)]
#[napi(object)]
pub struct ThreadTransaction { pub struct ThreadTransaction {
pub uid: u32, pub uid: u32,
pub fee: u64, pub fee: f64,
pub weight: u32, pub weight: u32,
pub sigops: u32, pub sigops: u32,
pub fee_per_vsize: f64,
pub effective_fee_per_vsize: f64, pub effective_fee_per_vsize: f64,
pub inputs: Vec<u32>, pub inputs: Vec<u32>,
} }
impl ThreadTransaction {
pub fn batch_from_buffer(buffer: &[u8]) -> Vec<Self> {
let mut transactions: Vec<Self> = Vec::new();
let mut cursor = Cursor::new(buffer);
let size = cursor.get_u32();
for _ in 0..size {
let uid = cursor.get_u32();
let fee = cursor.get_f64().round() as u64;
let weight = cursor.get_u32();
let sigops = cursor.get_u32();
let fee_per_vsize = cursor.get_f64();
let effective_fee_per_vsize = cursor.get_f64();
let input_count = cursor.get_u32();
let mut inputs: Vec<u32> = Vec::new();
for _ in 0..input_count {
inputs.push(cursor.get_u32());
}
transactions.push(Self {
uid,
fee,
weight,
sigops,
fee_per_vsize,
effective_fee_per_vsize,
inputs,
});
}
transactions
}
}

View File

@ -1,13 +0,0 @@
use bytes::buf::Buf;
use std::io::Cursor;
pub fn txids_from_buffer(buffer: &[u8]) -> Vec<u32> {
let mut txids: Vec<u32> = Vec::new();
let mut cursor: Cursor<&[u8]> = Cursor::new(buffer);
let size: u32 = cursor.get_u32();
for _ in 0..size {
txids.push(cursor.get_u32());
}
txids
}

View File

@ -1,6 +1,7 @@
import fs from 'fs'; import fs from 'fs';
import { GbtGenerator } from '../../../rust-gbt'; import { GbtGenerator } from '../../../rust-gbt';
import path from 'path'; import path from 'path';
import { CompactThreadTransaction } from '../../mempool.interfaces';
const baseline = require('./test-data/target-template.json'); const baseline = require('./test-data/target-template.json');
const testVector = require('./test-data/test-data-ids.json'); const testVector = require('./test-data/test-data-ids.json');
@ -9,12 +10,13 @@ const vectorTxidMap: Map<string, number> = new Map(testVector.map(x => [x[1], x
// Note that this test buffer is specially constructed // Note that this test buffer is specially constructed
// such that uids are assigned in numerical txid order // such that uids are assigned in numerical txid order
// so that ties break the same way as in Core's implementation // so that ties break the same way as in Core's implementation
const vectorBuffer: ArrayBuffer = fs.readFileSync(path.join(__dirname, './', './test-data/test-buffer.bin')); const vectorBuffer: Buffer = fs.readFileSync(path.join(__dirname, './', './test-data/test-buffer.bin'));
describe('Rust GBT', () => { describe('Rust GBT', () => {
test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => { test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => {
const rustGbt = new GbtGenerator(); const rustGbt = new GbtGenerator();
const result = await rustGbt.make(new Uint8Array(vectorBuffer)); const mempool = mempoolFromArrayBuffer(vectorBuffer.buffer);
const result = await rustGbt.make(mempool);
const blocks: [string, number][][] = result.blocks.map(block => { const blocks: [string, number][][] = result.blocks.map(block => {
return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]); return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]);
@ -25,3 +27,29 @@ describe('Rust GBT', () => {
expect(blocks[0]).toEqual(template); expect(blocks[0]).toEqual(template);
}); });
}); });
function mempoolFromArrayBuffer(buf: ArrayBuffer): CompactThreadTransaction[] {
const view = new DataView(buf);
const count = view.getUint32(0, false);
const txs: CompactThreadTransaction[] = [];
let offset = 4;
for (let i = 0; i < count; i++) {
const tx: CompactThreadTransaction = {
uid: view.getUint32(offset, false),
fee: view.getFloat64(offset + 4, false),
weight: view.getUint32(offset + 12, false),
sigops: view.getUint32(offset + 16, false),
feePerVsize: view.getFloat64(offset + 20, false),
effectiveFeePerVsize: view.getFloat64(offset + 28, false),
inputs: [],
};
const numInputs = view.getUint32(offset + 36, false);
offset += 40;
for (let j = 0; j < numInputs; j++) {
tx.inputs.push(view.getUint32(offset, false));
offset += 4;
}
txs.push(tx);
}
return txs;
}

View File

@ -1,4 +1,4 @@
import { GbtGenerator } from '../../rust-gbt'; import { GbtGenerator, ThreadTransaction as RustThreadTransaction } from '../../rust-gbt';
import logger from '../logger'; import logger from '../logger';
import { MempoolBlock, MempoolTransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction, EffectiveFeeStats } from '../mempool.interfaces'; import { MempoolBlock, MempoolTransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction, EffectiveFeeStats } from '../mempool.interfaces';
import { Common, OnlineFeeStatsCalculator } from './common'; import { Common, OnlineFeeStatsCalculator } from './common';
@ -341,16 +341,16 @@ class MempoolBlocks {
for (const tx of Object.values(newMempool)) { for (const tx of Object.values(newMempool)) {
this.setUid(tx, !saveResults); this.setUid(tx, !saveResults);
} }
// set short ids for transaction inputs
// serialize relevant mempool data into an ArrayBuffer for (const tx of Object.values(newMempool)) {
// to reduce the overhead of passing this data to the rust thread tx.inputs = tx.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
const mempoolBuffer = this.mempoolToArrayBuffer(Object.values(newMempool), newMempool); }
// run the block construction algorithm in a separate thread, and wait for a result // run the block construction algorithm in a separate thread, and wait for a result
const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator(); const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator();
try { try {
const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids(
await rustGbt.make(new Uint8Array(mempoolBuffer)), await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[]),
); );
if (saveResults) { if (saveResults) {
this.rustInitialized = true; this.rustInitialized = true;
@ -383,22 +383,22 @@ class MempoolBlocks {
} }
const start = Date.now(); const start = Date.now();
// set missing short ids
for (const tx of Object.values(added)) { for (const tx of added) {
this.setUid(tx, true); this.setUid(tx, true);
} }
// set short ids for transaction inputs
for (const tx of added) {
tx.inputs = tx.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
}
const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => (uid !== null && uid !== undefined)) as number[]; const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => (uid !== null && uid !== undefined)) as number[];
// serialize relevant mempool data into an ArrayBuffer
// to reduce the overhead of passing this data to the rust thread
const addedBuffer = this.mempoolToArrayBuffer(added, newMempool);
const removedBuffer = this.uidsToArrayBuffer(removedUids);
// run the block construction algorithm in a separate thread, and wait for a result // run the block construction algorithm in a separate thread, and wait for a result
try { try {
const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids(
await this.rustGbtGenerator.update( await this.rustGbtGenerator.update(
new Uint8Array(addedBuffer), added as RustThreadTransaction[],
new Uint8Array(removedBuffer), removedUids,
), ),
); );
const expectedMempoolSize = Object.keys(newMempool).length; const expectedMempoolSize = Object.keys(newMempool).length;
@ -646,54 +646,6 @@ class MempoolBlocks {
} }
return { blocks: convertedBlocks, blockWeights, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], blockWeights: number[], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; return { blocks: convertedBlocks, blockWeights, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], blockWeights: number[], rates: { [root: string]: number }, clusters: { [root: string]: string[] }};
} }
private mempoolToArrayBuffer(txs: MempoolTransactionExtended[], mempool: { [txid: string]: MempoolTransactionExtended }): ArrayBuffer {
let len = 4;
const inputs: { [uid: number]: number[] } = {};
let validCount = 0;
for (const tx of txs) {
if (tx.uid !== null && tx.uid !== undefined) {
validCount++;
const txInputs = tx.vin.map(v => this.getUid(mempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
inputs[tx.uid] = txInputs;
len += (10 + txInputs.length) * 4;
}
}
const buf = new ArrayBuffer(len);
const view = new DataView(buf);
view.setUint32(0, validCount, false);
let offset = 4;
for (const tx of txs) {
if (tx.uid !== null && tx.uid !== undefined) {
view.setUint32(offset, tx.uid, false);
view.setFloat64(offset + 4, tx.fee, false);
view.setUint32(offset + 12, tx.weight, false);
view.setUint32(offset + 16, tx.sigops, false);
view.setFloat64(offset + 20, (tx.adjustedFeePerVsize || tx.feePerVsize), false);
view.setFloat64(offset + 28, (tx.effectiveFeePerVsize || tx.adjustedFeePerVsize || tx.feePerVsize), false);
view.setUint32(offset + 36, inputs[tx.uid].length, false);
offset += 40;
for (const input of inputs[tx.uid]) {
view.setUint32(offset, input, false);
offset += 4;
}
}
}
return buf;
}
private uidsToArrayBuffer(uids: number[]): ArrayBuffer {
const len = (uids.length + 1) * 4;
const buf = new ArrayBuffer(len);
const view = new DataView(buf);
view.setUint32(0, uids.length, false);
let offset = 4;
for (const uid of uids) {
view.setUint32(offset, uid, false);
offset += 4;
}
return buf;
}
} }
export default new MempoolBlocks(); export default new MempoolBlocks();

View File

@ -96,6 +96,7 @@ export interface MempoolTransactionExtended extends TransactionExtended {
sigops: number; sigops: number;
adjustedVsize: number; adjustedVsize: number;
adjustedFeePerVsize: number; adjustedFeePerVsize: number;
inputs?: number[];
} }
export interface AuditTransaction { export interface AuditTransaction {
@ -125,7 +126,7 @@ export interface CompactThreadTransaction {
weight: number; weight: number;
sigops: number; sigops: number;
feePerVsize: number; feePerVsize: number;
effectiveFeePerVsize?: number; effectiveFeePerVsize: number;
inputs: number[]; inputs: number[];
cpfpRoot?: number; cpfpRoot?: number;
cpfpChecked?: boolean; cpfpChecked?: boolean;