Pass gbt mempool data directly without serialization
This commit is contained in:
		
							parent
							
								
									860388d9a4
								
							
						
					
					
						commit
						1e1bf14692
					
				
							
								
								
									
										12
									
								
								backend/rust-gbt/index.d.ts
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										12
									
								
								backend/rust-gbt/index.d.ts
									
									
									
									
										vendored
									
									
								
							@ -3,6 +3,14 @@
 | 
			
		||||
 | 
			
		||||
/* auto-generated by NAPI-RS */
 | 
			
		||||
 | 
			
		||||
export interface ThreadTransaction {
 | 
			
		||||
  uid: number
 | 
			
		||||
  fee: number
 | 
			
		||||
  weight: number
 | 
			
		||||
  sigops: number
 | 
			
		||||
  effectiveFeePerVsize: number
 | 
			
		||||
  inputs: Array<number>
 | 
			
		||||
}
 | 
			
		||||
export class GbtGenerator {
 | 
			
		||||
  constructor()
 | 
			
		||||
  /**
 | 
			
		||||
@ -10,13 +18,13 @@ export class GbtGenerator {
 | 
			
		||||
   *
 | 
			
		||||
   * Rejects if the thread panics or if the Mutex is poisoned.
 | 
			
		||||
   */
 | 
			
		||||
  make(mempoolBuffer: Uint8Array): Promise<GbtResult>
 | 
			
		||||
  make(mempool: Array<ThreadTransaction>): Promise<GbtResult>
 | 
			
		||||
  /**
 | 
			
		||||
   * # Errors
 | 
			
		||||
   *
 | 
			
		||||
   * Rejects if the thread panics or if the Mutex is poisoned.
 | 
			
		||||
   */
 | 
			
		||||
  update(newTxs: Uint8Array, removeTxs: Uint8Array): Promise<GbtResult>
 | 
			
		||||
  update(newTxs: Array<ThreadTransaction>, removeTxs: Array<number>): Promise<GbtResult>
 | 
			
		||||
}
 | 
			
		||||
/**
 | 
			
		||||
 * The result from calling the gbt function.
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,5 @@
 | 
			
		||||
use crate::{
 | 
			
		||||
    thread_transaction::ThreadTransaction,
 | 
			
		||||
    u32_hasher_types::{u32hashset_new, U32HasherState},
 | 
			
		||||
    u32_hasher_types::{u32hashset_new, U32HasherState}, ThreadTransaction,
 | 
			
		||||
};
 | 
			
		||||
use std::{
 | 
			
		||||
    cmp::Ordering,
 | 
			
		||||
@ -16,7 +15,6 @@ pub struct AuditTransaction {
 | 
			
		||||
    pub weight: u32,
 | 
			
		||||
    pub sigop_adjusted_vsize: u32,
 | 
			
		||||
    pub sigops: u32,
 | 
			
		||||
    pub fee_per_vsize: f64,
 | 
			
		||||
    pub effective_fee_per_vsize: f64,
 | 
			
		||||
    pub dependency_rate: f64,
 | 
			
		||||
    pub inputs: Vec<u32>,
 | 
			
		||||
@ -81,18 +79,17 @@ impl AuditTransaction {
 | 
			
		||||
        let sigop_adjusted_vsize = ((tx.weight + 3) / 4).max(tx.sigops * 5);
 | 
			
		||||
        Self {
 | 
			
		||||
            uid: tx.uid,
 | 
			
		||||
            fee: tx.fee,
 | 
			
		||||
            fee: tx.fee as u64,
 | 
			
		||||
            weight: tx.weight,
 | 
			
		||||
            sigop_adjusted_vsize,
 | 
			
		||||
            sigops: tx.sigops,
 | 
			
		||||
            fee_per_vsize: tx.fee_per_vsize,
 | 
			
		||||
            effective_fee_per_vsize: tx.effective_fee_per_vsize,
 | 
			
		||||
            dependency_rate: f64::INFINITY,
 | 
			
		||||
            inputs: tx.inputs.clone(),
 | 
			
		||||
            relatives_set_flag: false,
 | 
			
		||||
            ancestors: u32hashset_new(),
 | 
			
		||||
            children: u32hashset_new(),
 | 
			
		||||
            ancestor_fee: tx.fee,
 | 
			
		||||
            ancestor_fee: tx.fee as u64,
 | 
			
		||||
            ancestor_weight: tx.weight,
 | 
			
		||||
            ancestor_sigop_adjusted_vsize: sigop_adjusted_vsize,
 | 
			
		||||
            ancestor_sigops: tx.sigops,
 | 
			
		||||
 | 
			
		||||
@ -6,8 +6,9 @@
 | 
			
		||||
#![allow(clippy::cast_sign_loss)]
 | 
			
		||||
#![allow(clippy::float_cmp)]
 | 
			
		||||
 | 
			
		||||
use napi::bindgen_prelude::{Result, Uint8Array};
 | 
			
		||||
use napi::bindgen_prelude::{Result};
 | 
			
		||||
use napi_derive::napi;
 | 
			
		||||
use thread_transaction::ThreadTransaction;
 | 
			
		||||
use tracing::{debug, info, trace};
 | 
			
		||||
use tracing_log::LogTracer;
 | 
			
		||||
use tracing_subscriber::{EnvFilter, FmtSubscriber};
 | 
			
		||||
@ -19,9 +20,7 @@ mod audit_transaction;
 | 
			
		||||
mod gbt;
 | 
			
		||||
mod thread_transaction;
 | 
			
		||||
mod u32_hasher_types;
 | 
			
		||||
mod utils;
 | 
			
		||||
 | 
			
		||||
use thread_transaction::ThreadTransaction;
 | 
			
		||||
use u32_hasher_types::{u32hashmap_with_capacity, U32HasherState};
 | 
			
		||||
 | 
			
		||||
/// This is the starting capacity for HashMap/Vec/etc. that deal with transactions.
 | 
			
		||||
@ -78,10 +77,10 @@ impl GbtGenerator {
 | 
			
		||||
    ///
 | 
			
		||||
    /// Rejects if the thread panics or if the Mutex is poisoned.
 | 
			
		||||
    #[napi]
 | 
			
		||||
    pub async fn make(&self, mempool_buffer: Uint8Array) -> Result<GbtResult> {
 | 
			
		||||
    pub async fn make(&self, mempool: Vec<ThreadTransaction>) -> Result<GbtResult> {
 | 
			
		||||
        trace!("make: Current State {:#?}", self.thread_transactions);
 | 
			
		||||
        run_task(Arc::clone(&self.thread_transactions), move |map| {
 | 
			
		||||
            for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) {
 | 
			
		||||
            for tx in mempool {
 | 
			
		||||
                map.insert(tx.uid, tx);
 | 
			
		||||
            }
 | 
			
		||||
        })
 | 
			
		||||
@ -92,13 +91,13 @@ impl GbtGenerator {
 | 
			
		||||
    ///
 | 
			
		||||
    /// Rejects if the thread panics or if the Mutex is poisoned.
 | 
			
		||||
    #[napi]
 | 
			
		||||
    pub async fn update(&self, new_txs: Uint8Array, remove_txs: Uint8Array) -> Result<GbtResult> {
 | 
			
		||||
    pub async fn update(&self, new_txs: Vec<ThreadTransaction>, remove_txs: Vec<u32>) -> Result<GbtResult> {
 | 
			
		||||
        trace!("update: Current State {:#?}", self.thread_transactions);
 | 
			
		||||
        run_task(Arc::clone(&self.thread_transactions), move |map| {
 | 
			
		||||
            for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
 | 
			
		||||
            for tx in new_txs {
 | 
			
		||||
                map.insert(tx.uid, tx);
 | 
			
		||||
            }
 | 
			
		||||
            for txid in &utils::txids_from_buffer(&remove_txs) {
 | 
			
		||||
            for txid in &remove_txs {
 | 
			
		||||
                map.remove(txid);
 | 
			
		||||
            }
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
@ -1,45 +1,12 @@
 | 
			
		||||
use bytes::buf::Buf;
 | 
			
		||||
use std::io::Cursor;
 | 
			
		||||
use napi_derive::napi;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
#[napi(object)]
 | 
			
		||||
pub struct ThreadTransaction {
 | 
			
		||||
    pub uid: u32,
 | 
			
		||||
    pub fee: u64,
 | 
			
		||||
    pub fee: f64,
 | 
			
		||||
    pub weight: u32,
 | 
			
		||||
    pub sigops: u32,
 | 
			
		||||
    pub fee_per_vsize: f64,
 | 
			
		||||
    pub effective_fee_per_vsize: f64,
 | 
			
		||||
    pub inputs: Vec<u32>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ThreadTransaction {
 | 
			
		||||
    pub fn batch_from_buffer(buffer: &[u8]) -> Vec<Self> {
 | 
			
		||||
        let mut transactions: Vec<Self> = Vec::new();
 | 
			
		||||
        let mut cursor = Cursor::new(buffer);
 | 
			
		||||
        let size = cursor.get_u32();
 | 
			
		||||
        for _ in 0..size {
 | 
			
		||||
            let uid = cursor.get_u32();
 | 
			
		||||
            let fee = cursor.get_f64().round() as u64;
 | 
			
		||||
            let weight = cursor.get_u32();
 | 
			
		||||
            let sigops = cursor.get_u32();
 | 
			
		||||
            let fee_per_vsize = cursor.get_f64();
 | 
			
		||||
            let effective_fee_per_vsize = cursor.get_f64();
 | 
			
		||||
            let input_count = cursor.get_u32();
 | 
			
		||||
            let mut inputs: Vec<u32> = Vec::new();
 | 
			
		||||
            for _ in 0..input_count {
 | 
			
		||||
                inputs.push(cursor.get_u32());
 | 
			
		||||
            }
 | 
			
		||||
            transactions.push(Self {
 | 
			
		||||
                uid,
 | 
			
		||||
                fee,
 | 
			
		||||
                weight,
 | 
			
		||||
                sigops,
 | 
			
		||||
                fee_per_vsize,
 | 
			
		||||
                effective_fee_per_vsize,
 | 
			
		||||
                inputs,
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        transactions
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
@ -1,13 +0,0 @@
 | 
			
		||||
use bytes::buf::Buf;
 | 
			
		||||
use std::io::Cursor;
 | 
			
		||||
 | 
			
		||||
pub fn txids_from_buffer(buffer: &[u8]) -> Vec<u32> {
 | 
			
		||||
    let mut txids: Vec<u32> = Vec::new();
 | 
			
		||||
    let mut cursor: Cursor<&[u8]> = Cursor::new(buffer);
 | 
			
		||||
    let size: u32 = cursor.get_u32();
 | 
			
		||||
    for _ in 0..size {
 | 
			
		||||
        txids.push(cursor.get_u32());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    txids
 | 
			
		||||
}
 | 
			
		||||
@ -1,6 +1,7 @@
 | 
			
		||||
import fs from 'fs';
 | 
			
		||||
import { GbtGenerator } from '../../../rust-gbt';
 | 
			
		||||
import path from 'path';
 | 
			
		||||
import { CompactThreadTransaction } from '../../mempool.interfaces';
 | 
			
		||||
 | 
			
		||||
const baseline = require('./test-data/target-template.json');
 | 
			
		||||
const testVector = require('./test-data/test-data-ids.json');
 | 
			
		||||
@ -9,12 +10,13 @@ const vectorTxidMap: Map<string, number>  = new Map(testVector.map(x => [x[1], x
 | 
			
		||||
// Note that this test buffer is specially constructed
 | 
			
		||||
// such that uids are assigned in numerical txid order
 | 
			
		||||
// so that ties break the same way as in Core's implementation
 | 
			
		||||
const vectorBuffer: ArrayBuffer = fs.readFileSync(path.join(__dirname, './', './test-data/test-buffer.bin'));
 | 
			
		||||
const vectorBuffer: Buffer = fs.readFileSync(path.join(__dirname, './', './test-data/test-buffer.bin'));
 | 
			
		||||
 | 
			
		||||
describe('Rust GBT', () => {
 | 
			
		||||
  test('should produce the same template as getBlockTemplate from Bitcoin Core', async () => {
 | 
			
		||||
    const rustGbt = new GbtGenerator();
 | 
			
		||||
    const result = await rustGbt.make(new Uint8Array(vectorBuffer));
 | 
			
		||||
    const mempool = mempoolFromArrayBuffer(vectorBuffer.buffer);
 | 
			
		||||
    const result = await rustGbt.make(mempool);
 | 
			
		||||
 | 
			
		||||
    const blocks: [string, number][][] = result.blocks.map(block => {
 | 
			
		||||
      return block.map(uid => [vectorUidMap.get(uid) || 'missing', uid]);
 | 
			
		||||
@ -25,3 +27,29 @@ describe('Rust GBT', () => {
 | 
			
		||||
    expect(blocks[0]).toEqual(template);
 | 
			
		||||
  });
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
function mempoolFromArrayBuffer(buf: ArrayBuffer): CompactThreadTransaction[] {
 | 
			
		||||
  const view = new DataView(buf);
 | 
			
		||||
  const count = view.getUint32(0, false);
 | 
			
		||||
  const txs: CompactThreadTransaction[] = [];
 | 
			
		||||
  let offset = 4;
 | 
			
		||||
  for (let i = 0; i < count; i++) {
 | 
			
		||||
    const tx: CompactThreadTransaction = {
 | 
			
		||||
      uid: view.getUint32(offset, false),
 | 
			
		||||
      fee: view.getFloat64(offset + 4, false),
 | 
			
		||||
      weight: view.getUint32(offset + 12, false),
 | 
			
		||||
      sigops: view.getUint32(offset + 16, false),
 | 
			
		||||
      feePerVsize: view.getFloat64(offset + 20, false),
 | 
			
		||||
      effectiveFeePerVsize: view.getFloat64(offset + 28, false),
 | 
			
		||||
      inputs: [],
 | 
			
		||||
    };
 | 
			
		||||
    const numInputs = view.getUint32(offset + 36, false);
 | 
			
		||||
    offset += 40;
 | 
			
		||||
    for (let j = 0; j < numInputs; j++) {
 | 
			
		||||
      tx.inputs.push(view.getUint32(offset, false));
 | 
			
		||||
      offset += 4;
 | 
			
		||||
    }
 | 
			
		||||
    txs.push(tx);
 | 
			
		||||
  }
 | 
			
		||||
  return txs;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,4 +1,4 @@
 | 
			
		||||
import { GbtGenerator } from '../../rust-gbt';
 | 
			
		||||
import { GbtGenerator, ThreadTransaction as RustThreadTransaction } from '../../rust-gbt';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { MempoolBlock, MempoolTransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction, EffectiveFeeStats } from '../mempool.interfaces';
 | 
			
		||||
import { Common, OnlineFeeStatsCalculator } from './common';
 | 
			
		||||
@ -341,16 +341,16 @@ class MempoolBlocks {
 | 
			
		||||
    for (const tx of Object.values(newMempool)) {
 | 
			
		||||
      this.setUid(tx, !saveResults);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // serialize relevant mempool data into an ArrayBuffer
 | 
			
		||||
    // to reduce the overhead of passing this data to the rust thread
 | 
			
		||||
    const mempoolBuffer = this.mempoolToArrayBuffer(Object.values(newMempool), newMempool);
 | 
			
		||||
    // set short ids for transaction inputs
 | 
			
		||||
    for (const tx of Object.values(newMempool)) {
 | 
			
		||||
      tx.inputs = tx.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // run the block construction algorithm in a separate thread, and wait for a result
 | 
			
		||||
    const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator();
 | 
			
		||||
    try {
 | 
			
		||||
      const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids(
 | 
			
		||||
        await rustGbt.make(new Uint8Array(mempoolBuffer)),
 | 
			
		||||
        await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[]),
 | 
			
		||||
      );
 | 
			
		||||
      if (saveResults) {
 | 
			
		||||
        this.rustInitialized = true;
 | 
			
		||||
@ -383,22 +383,22 @@ class MempoolBlocks {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const start = Date.now();
 | 
			
		||||
 | 
			
		||||
    for (const tx of Object.values(added)) {
 | 
			
		||||
    // set missing short ids
 | 
			
		||||
    for (const tx of added) {
 | 
			
		||||
      this.setUid(tx, true);
 | 
			
		||||
    }
 | 
			
		||||
    // set short ids for transaction inputs
 | 
			
		||||
    for (const tx of added) {
 | 
			
		||||
      tx.inputs = tx.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
 | 
			
		||||
    }
 | 
			
		||||
    const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => (uid !== null && uid !== undefined)) as number[];
 | 
			
		||||
    // serialize relevant mempool data into an ArrayBuffer
 | 
			
		||||
    // to reduce the overhead of passing this data to the rust thread
 | 
			
		||||
    const addedBuffer = this.mempoolToArrayBuffer(added, newMempool);
 | 
			
		||||
    const removedBuffer = this.uidsToArrayBuffer(removedUids);
 | 
			
		||||
 | 
			
		||||
    // run the block construction algorithm in a separate thread, and wait for a result
 | 
			
		||||
    try {
 | 
			
		||||
      const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids(
 | 
			
		||||
        await this.rustGbtGenerator.update(
 | 
			
		||||
            new Uint8Array(addedBuffer),
 | 
			
		||||
            new Uint8Array(removedBuffer),
 | 
			
		||||
          added as RustThreadTransaction[],
 | 
			
		||||
          removedUids,
 | 
			
		||||
        ),
 | 
			
		||||
      );
 | 
			
		||||
      const expectedMempoolSize = Object.keys(newMempool).length;
 | 
			
		||||
@ -646,54 +646,6 @@ class MempoolBlocks {
 | 
			
		||||
    }
 | 
			
		||||
    return { blocks: convertedBlocks, blockWeights, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], blockWeights: number[], rates: { [root: string]: number }, clusters: { [root: string]: string[] }};
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private mempoolToArrayBuffer(txs: MempoolTransactionExtended[], mempool: { [txid: string]: MempoolTransactionExtended }): ArrayBuffer {
 | 
			
		||||
    let len = 4;
 | 
			
		||||
    const inputs: { [uid: number]: number[] } = {};
 | 
			
		||||
    let validCount = 0;
 | 
			
		||||
    for (const tx of txs) {
 | 
			
		||||
      if (tx.uid !== null && tx.uid !== undefined) {
 | 
			
		||||
        validCount++;
 | 
			
		||||
        const txInputs = tx.vin.map(v => this.getUid(mempool[v.txid])).filter(uid => (uid !== null && uid !== undefined)) as number[];
 | 
			
		||||
        inputs[tx.uid] = txInputs;
 | 
			
		||||
        len += (10 + txInputs.length) * 4;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    const buf = new ArrayBuffer(len);
 | 
			
		||||
    const view = new DataView(buf);
 | 
			
		||||
    view.setUint32(0, validCount, false);
 | 
			
		||||
    let offset = 4;
 | 
			
		||||
    for (const tx of txs) {
 | 
			
		||||
      if (tx.uid !== null && tx.uid !== undefined) {
 | 
			
		||||
        view.setUint32(offset, tx.uid, false);
 | 
			
		||||
        view.setFloat64(offset + 4, tx.fee, false);
 | 
			
		||||
        view.setUint32(offset + 12, tx.weight, false);
 | 
			
		||||
        view.setUint32(offset + 16, tx.sigops, false);
 | 
			
		||||
        view.setFloat64(offset + 20, (tx.adjustedFeePerVsize || tx.feePerVsize), false);
 | 
			
		||||
        view.setFloat64(offset + 28, (tx.effectiveFeePerVsize || tx.adjustedFeePerVsize || tx.feePerVsize), false);
 | 
			
		||||
        view.setUint32(offset + 36, inputs[tx.uid].length, false);
 | 
			
		||||
        offset += 40;
 | 
			
		||||
        for (const input of inputs[tx.uid]) {
 | 
			
		||||
          view.setUint32(offset, input, false);
 | 
			
		||||
          offset += 4;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    return buf;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private uidsToArrayBuffer(uids: number[]): ArrayBuffer {
 | 
			
		||||
    const len = (uids.length + 1) * 4;
 | 
			
		||||
    const buf = new ArrayBuffer(len);
 | 
			
		||||
    const view = new DataView(buf);
 | 
			
		||||
    view.setUint32(0, uids.length, false);
 | 
			
		||||
    let offset = 4;
 | 
			
		||||
    for (const uid of uids) {
 | 
			
		||||
      view.setUint32(offset, uid, false);
 | 
			
		||||
      offset += 4;
 | 
			
		||||
    }
 | 
			
		||||
    return buf;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default new MempoolBlocks();
 | 
			
		||||
 | 
			
		||||
@ -96,6 +96,7 @@ export interface MempoolTransactionExtended extends TransactionExtended {
 | 
			
		||||
  sigops: number;
 | 
			
		||||
  adjustedVsize: number;
 | 
			
		||||
  adjustedFeePerVsize: number;
 | 
			
		||||
  inputs?: number[];
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface AuditTransaction {
 | 
			
		||||
@ -125,7 +126,7 @@ export interface CompactThreadTransaction {
 | 
			
		||||
  weight: number;
 | 
			
		||||
  sigops: number;
 | 
			
		||||
  feePerVsize: number;
 | 
			
		||||
  effectiveFeePerVsize?: number;
 | 
			
		||||
  effectiveFeePerVsize: number;
 | 
			
		||||
  inputs: number[];
 | 
			
		||||
  cpfpRoot?: number;
 | 
			
		||||
  cpfpChecked?: boolean;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user