Merge pull request #4554 from mempool/mononaut/unmineable-txs
Handle unmineable transactions in GBT implementations
This commit is contained in:
		
						commit
						aad288c0d9
					
				
							
								
								
									
										3
									
								
								backend/rust-gbt/index.d.ts
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								backend/rust-gbt/index.d.ts
									
									
									
									
										vendored
									
									
								
							| @ -45,5 +45,6 @@ export class GbtResult { | |||||||
|   blockWeights: Array<number> |   blockWeights: Array<number> | ||||||
|   clusters: Array<Array<number>> |   clusters: Array<Array<number>> | ||||||
|   rates: Array<Array<number>> |   rates: Array<Array<number>> | ||||||
|   constructor(blocks: Array<Array<number>>, blockWeights: Array<number>, clusters: Array<Array<number>>, rates: Array<Array<number>>) |   overflow: Array<number> | ||||||
|  |   constructor(blocks: Array<Array<number>>, blockWeights: Array<number>, clusters: Array<Array<number>>, rates: Array<Array<number>>, overflow: Array<number>) | ||||||
| } | } | ||||||
|  | |||||||
| @ -60,6 +60,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap, accelerations: &[ThreadAccelerat | |||||||
|         indexed_accelerations[acceleration.uid as usize] = Some(acceleration); |         indexed_accelerations[acceleration.uid as usize] = Some(acceleration); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     info!("Initializing working vecs with uid capacity for {}", max_uid + 1); | ||||||
|     let mempool_len = mempool.len(); |     let mempool_len = mempool.len(); | ||||||
|     let mut audit_pool: AuditPool = Vec::with_capacity(max_uid + 1); |     let mut audit_pool: AuditPool = Vec::with_capacity(max_uid + 1); | ||||||
|     audit_pool.resize(max_uid + 1, None); |     audit_pool.resize(max_uid + 1, None); | ||||||
| @ -127,74 +128,75 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap, accelerations: &[ThreadAccelerat | |||||||
|         let next_from_stack = next_valid_from_stack(&mut mempool_stack, &audit_pool); |         let next_from_stack = next_valid_from_stack(&mut mempool_stack, &audit_pool); | ||||||
|         let next_from_queue = next_valid_from_queue(&mut modified, &audit_pool); |         let next_from_queue = next_valid_from_queue(&mut modified, &audit_pool); | ||||||
|         if next_from_stack.is_none() && next_from_queue.is_none() { |         if next_from_stack.is_none() && next_from_queue.is_none() { | ||||||
|             continue; |             info!("No transactions left! {:#?} in overflow", overflow.len()); | ||||||
|         } |  | ||||||
|         let (next_tx, from_stack) = match (next_from_stack, next_from_queue) { |  | ||||||
|             (Some(stack_tx), Some(queue_tx)) => match queue_tx.cmp(stack_tx) { |  | ||||||
|                 std::cmp::Ordering::Less => (stack_tx, true), |  | ||||||
|                 _ => (queue_tx, false), |  | ||||||
|             }, |  | ||||||
|             (Some(stack_tx), None) => (stack_tx, true), |  | ||||||
|             (None, Some(queue_tx)) => (queue_tx, false), |  | ||||||
|             (None, None) => unreachable!(), |  | ||||||
|         }; |  | ||||||
| 
 |  | ||||||
|         if from_stack { |  | ||||||
|             mempool_stack.pop(); |  | ||||||
|         } else { |         } else { | ||||||
|             modified.pop(); |             let (next_tx, from_stack) = match (next_from_stack, next_from_queue) { | ||||||
|         } |                 (Some(stack_tx), Some(queue_tx)) => match queue_tx.cmp(stack_tx) { | ||||||
|  |                     std::cmp::Ordering::Less => (stack_tx, true), | ||||||
|  |                     _ => (queue_tx, false), | ||||||
|  |                 }, | ||||||
|  |                 (Some(stack_tx), None) => (stack_tx, true), | ||||||
|  |                 (None, Some(queue_tx)) => (queue_tx, false), | ||||||
|  |                 (None, None) => unreachable!(), | ||||||
|  |             }; | ||||||
| 
 | 
 | ||||||
|         if blocks.len() < (MAX_BLOCKS - 1) |             if from_stack { | ||||||
|             && ((block_weight + (4 * next_tx.ancestor_sigop_adjusted_vsize()) |                 mempool_stack.pop(); | ||||||
|                 >= MAX_BLOCK_WEIGHT_UNITS) |             } else { | ||||||
|                 || (block_sigops + next_tx.ancestor_sigops() > BLOCK_SIGOPS)) |                 modified.pop(); | ||||||
|         { |  | ||||||
|             // hold this package in an overflow list while we check for smaller options
 |  | ||||||
|             overflow.push(next_tx.uid); |  | ||||||
|             failures += 1; |  | ||||||
|         } else { |  | ||||||
|             let mut package: Vec<(u32, u32, usize)> = Vec::new(); |  | ||||||
|             let mut cluster: Vec<u32> = Vec::new(); |  | ||||||
|             let is_cluster: bool = !next_tx.ancestors.is_empty(); |  | ||||||
|             for ancestor_id in &next_tx.ancestors { |  | ||||||
|                 if let Some(Some(ancestor)) = audit_pool.get(*ancestor_id as usize) { |  | ||||||
|                     package.push((*ancestor_id, ancestor.order(), ancestor.ancestors.len())); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             package.sort_unstable_by(|a, b| -> Ordering { |  | ||||||
|                 if a.2 != b.2 { |  | ||||||
|                     // order by ascending ancestor count
 |  | ||||||
|                     a.2.cmp(&b.2) |  | ||||||
|                 } else if a.1 != b.1 { |  | ||||||
|                     // tie-break by ascending partial txid
 |  | ||||||
|                     a.1.cmp(&b.1) |  | ||||||
|                 } else { |  | ||||||
|                     // tie-break partial txid collisions by ascending uid
 |  | ||||||
|                     a.0.cmp(&b.0) |  | ||||||
|                 } |  | ||||||
|             }); |  | ||||||
|             package.push((next_tx.uid, next_tx.order(), next_tx.ancestors.len())); |  | ||||||
| 
 |  | ||||||
|             let cluster_rate = next_tx.cluster_rate(); |  | ||||||
| 
 |  | ||||||
|             for (txid, _, _) in &package { |  | ||||||
|                 cluster.push(*txid); |  | ||||||
|                 if let Some(Some(tx)) = audit_pool.get_mut(*txid as usize) { |  | ||||||
|                     tx.used = true; |  | ||||||
|                     tx.set_dirty_if_different(cluster_rate); |  | ||||||
|                     transactions.push(tx.uid); |  | ||||||
|                     block_weight += tx.weight; |  | ||||||
|                     block_sigops += tx.sigops; |  | ||||||
|                 } |  | ||||||
|                 update_descendants(*txid, &mut audit_pool, &mut modified, cluster_rate); |  | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             if is_cluster { |             if blocks.len() < (MAX_BLOCKS - 1) | ||||||
|                 clusters.push(cluster); |                 && ((block_weight + (4 * next_tx.ancestor_sigop_adjusted_vsize()) | ||||||
|             } |                     >= MAX_BLOCK_WEIGHT_UNITS) | ||||||
|  |                     || (block_sigops + next_tx.ancestor_sigops() > BLOCK_SIGOPS)) | ||||||
|  |             { | ||||||
|  |                 // hold this package in an overflow list while we check for smaller options
 | ||||||
|  |                 overflow.push(next_tx.uid); | ||||||
|  |                 failures += 1; | ||||||
|  |             } else { | ||||||
|  |                 let mut package: Vec<(u32, u32, usize)> = Vec::new(); | ||||||
|  |                 let mut cluster: Vec<u32> = Vec::new(); | ||||||
|  |                 let is_cluster: bool = !next_tx.ancestors.is_empty(); | ||||||
|  |                 for ancestor_id in &next_tx.ancestors { | ||||||
|  |                     if let Some(Some(ancestor)) = audit_pool.get(*ancestor_id as usize) { | ||||||
|  |                         package.push((*ancestor_id, ancestor.order(), ancestor.ancestors.len())); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 package.sort_unstable_by(|a, b| -> Ordering { | ||||||
|  |                     if a.2 != b.2 { | ||||||
|  |                         // order by ascending ancestor count
 | ||||||
|  |                         a.2.cmp(&b.2) | ||||||
|  |                     } else if a.1 != b.1 { | ||||||
|  |                         // tie-break by ascending partial txid
 | ||||||
|  |                         a.1.cmp(&b.1) | ||||||
|  |                     } else { | ||||||
|  |                         // tie-break partial txid collisions by ascending uid
 | ||||||
|  |                         a.0.cmp(&b.0) | ||||||
|  |                     } | ||||||
|  |                 }); | ||||||
|  |                 package.push((next_tx.uid, next_tx.order(), next_tx.ancestors.len())); | ||||||
| 
 | 
 | ||||||
|             failures = 0; |                 let cluster_rate = next_tx.cluster_rate(); | ||||||
|  | 
 | ||||||
|  |                 for (txid, _, _) in &package { | ||||||
|  |                     cluster.push(*txid); | ||||||
|  |                     if let Some(Some(tx)) = audit_pool.get_mut(*txid as usize) { | ||||||
|  |                         tx.used = true; | ||||||
|  |                         tx.set_dirty_if_different(cluster_rate); | ||||||
|  |                         transactions.push(tx.uid); | ||||||
|  |                         block_weight += tx.weight; | ||||||
|  |                         block_sigops += tx.sigops; | ||||||
|  |                     } | ||||||
|  |                     update_descendants(*txid, &mut audit_pool, &mut modified, cluster_rate); | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 if is_cluster { | ||||||
|  |                     clusters.push(cluster); | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 failures = 0; | ||||||
|  |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         // this block is full
 |         // this block is full
 | ||||||
| @ -203,10 +205,14 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap, accelerations: &[ThreadAccelerat | |||||||
|         let queue_is_empty = mempool_stack.is_empty() && modified.is_empty(); |         let queue_is_empty = mempool_stack.is_empty() && modified.is_empty(); | ||||||
|         if (exceeded_package_tries || queue_is_empty) && blocks.len() < (MAX_BLOCKS - 1) { |         if (exceeded_package_tries || queue_is_empty) && blocks.len() < (MAX_BLOCKS - 1) { | ||||||
|             // finalize this block
 |             // finalize this block
 | ||||||
|             if !transactions.is_empty() { |             if transactions.is_empty() { | ||||||
|                 blocks.push(transactions); |                 info!("trying to push an empty block! breaking loop! mempool {:#?} | modified {:#?} | overflow {:#?}", mempool_stack.len(), modified.len(), overflow.len()); | ||||||
|                 block_weights.push(block_weight); |                 break; | ||||||
|             } |             } | ||||||
|  | 
 | ||||||
|  |             blocks.push(transactions); | ||||||
|  |             block_weights.push(block_weight); | ||||||
|  | 
 | ||||||
|             // reset for the next block
 |             // reset for the next block
 | ||||||
|             transactions = Vec::with_capacity(initial_txes_per_block); |             transactions = Vec::with_capacity(initial_txes_per_block); | ||||||
|             block_weight = BLOCK_RESERVED_WEIGHT; |             block_weight = BLOCK_RESERVED_WEIGHT; | ||||||
| @ -265,6 +271,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap, accelerations: &[ThreadAccelerat | |||||||
|         block_weights, |         block_weights, | ||||||
|         clusters, |         clusters, | ||||||
|         rates, |         rates, | ||||||
|  |         overflow, | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -133,6 +133,7 @@ pub struct GbtResult { | |||||||
|     pub block_weights: Vec<u32>, |     pub block_weights: Vec<u32>, | ||||||
|     pub clusters: Vec<Vec<u32>>, |     pub clusters: Vec<Vec<u32>>, | ||||||
|     pub rates: Vec<Vec<f64>>, // Tuples not supported. u32 fits inside f64
 |     pub rates: Vec<Vec<f64>>, // Tuples not supported. u32 fits inside f64
 | ||||||
|  |     pub overflow: Vec<u32>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// All on another thread, this runs an arbitrary task in between
 | /// All on another thread, this runs an arbitrary task in between
 | ||||||
|  | |||||||
| @ -368,12 +368,15 @@ class MempoolBlocks { | |||||||
|     // run the block construction algorithm in a separate thread, and wait for a result
 |     // run the block construction algorithm in a separate thread, and wait for a result
 | ||||||
|     const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator(); |     const rustGbt = saveResults ? this.rustGbtGenerator : new GbtGenerator(); | ||||||
|     try { |     try { | ||||||
|       const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( |       const { blocks, blockWeights, rates, clusters, overflow } = this.convertNapiResultTxids( | ||||||
|         await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[], convertedAccelerations as RustThreadAcceleration[], this.nextUid), |         await rustGbt.make(Object.values(newMempool) as RustThreadTransaction[], convertedAccelerations as RustThreadAcceleration[], this.nextUid), | ||||||
|       ); |       ); | ||||||
|       if (saveResults) { |       if (saveResults) { | ||||||
|         this.rustInitialized = true; |         this.rustInitialized = true; | ||||||
|       } |       } | ||||||
|  |       const mempoolSize = Object.keys(newMempool).length; | ||||||
|  |       const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0) + overflow.length; | ||||||
|  |       logger.debug(`RUST updateBlockTemplates returned ${resultMempoolSize} txs out of ${mempoolSize} in the mempool, ${overflow.length} were unmineable`); | ||||||
|       const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, accelerations, accelerationPool, saveResults); |       const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, accelerations, accelerationPool, saveResults); | ||||||
|       logger.debug(`RUST makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); |       logger.debug(`RUST makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); | ||||||
|       return processed; |       return processed; | ||||||
| @ -424,7 +427,7 @@ class MempoolBlocks { | |||||||
| 
 | 
 | ||||||
|     // run the block construction algorithm in a separate thread, and wait for a result
 |     // run the block construction algorithm in a separate thread, and wait for a result
 | ||||||
|     try { |     try { | ||||||
|       const { blocks, blockWeights, rates, clusters } = this.convertNapiResultTxids( |       const { blocks, blockWeights, rates, clusters, overflow } = this.convertNapiResultTxids( | ||||||
|         await this.rustGbtGenerator.update( |         await this.rustGbtGenerator.update( | ||||||
|           added as RustThreadTransaction[], |           added as RustThreadTransaction[], | ||||||
|           removedUids, |           removedUids, | ||||||
| @ -432,9 +435,10 @@ class MempoolBlocks { | |||||||
|           this.nextUid, |           this.nextUid, | ||||||
|         ), |         ), | ||||||
|       ); |       ); | ||||||
|       const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0); |       const resultMempoolSize = blocks.reduce((total, block) => total + block.length, 0) + overflow.length; | ||||||
|  |       logger.debug(`RUST updateBlockTemplates returned ${resultMempoolSize} txs out of ${mempoolSize} in the mempool, ${overflow.length} were unmineable`); | ||||||
|       if (mempoolSize !== resultMempoolSize) { |       if (mempoolSize !== resultMempoolSize) { | ||||||
|         throw new Error('GBT returned wrong number of transactions, cache is probably out of sync'); |         throw new Error('GBT returned wrong number of transactions , cache is probably out of sync'); | ||||||
|       } else { |       } else { | ||||||
|         const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, accelerations, accelerationPool, true); |         const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, accelerations, accelerationPool, true); | ||||||
|         this.removeUids(removedUids); |         this.removeUids(removedUids); | ||||||
| @ -658,8 +662,8 @@ class MempoolBlocks { | |||||||
|     return { blocks: convertedBlocks, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; |     return { blocks: convertedBlocks, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private convertNapiResultTxids({ blocks, blockWeights, rates, clusters }: GbtResult) |   private convertNapiResultTxids({ blocks, blockWeights, rates, clusters, overflow }: GbtResult) | ||||||
|     : { blocks: string[][], blockWeights: number[], rates: [string, number][], clusters: string[][] } { |     : { blocks: string[][], blockWeights: number[], rates: [string, number][], clusters: string[][], overflow: string[] } { | ||||||
|     const convertedBlocks: string[][] = blocks.map(block => block.map(uid => { |     const convertedBlocks: string[][] = blocks.map(block => block.map(uid => { | ||||||
|       const txid = this.uidMap.get(uid); |       const txid = this.uidMap.get(uid); | ||||||
|       if (txid !== undefined) { |       if (txid !== undefined) { | ||||||
| @ -677,7 +681,15 @@ class MempoolBlocks { | |||||||
|     for (const cluster of clusters) { |     for (const cluster of clusters) { | ||||||
|       convertedClusters.push(cluster.map(uid => this.uidMap.get(uid)) as string[]); |       convertedClusters.push(cluster.map(uid => this.uidMap.get(uid)) as string[]); | ||||||
|     } |     } | ||||||
|     return { blocks: convertedBlocks, blockWeights, rates: convertedRates, clusters: convertedClusters }; |     const convertedOverflow: string[] = overflow.map(uid => { | ||||||
|  |       const txid = this.uidMap.get(uid); | ||||||
|  |       if (txid !== undefined) { | ||||||
|  |         return txid; | ||||||
|  |       } else { | ||||||
|  |         throw new Error('GBT returned an unmineable transaction with unknown uid'); | ||||||
|  |       } | ||||||
|  |     }); | ||||||
|  |     return { blocks: convertedBlocks, blockWeights, rates: convertedRates, clusters: convertedClusters, overflow: convertedOverflow }; | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -173,10 +173,13 @@ function makeBlockTemplates(mempool: Map<number, CompactThreadTransaction>) | |||||||
|     // this block is full
 |     // this block is full
 | ||||||
|     const exceededPackageTries = failures > 1000 && blockWeight > (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000); |     const exceededPackageTries = failures > 1000 && blockWeight > (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000); | ||||||
|     const queueEmpty = top >= mempoolArray.length && modified.isEmpty(); |     const queueEmpty = top >= mempoolArray.length && modified.isEmpty(); | ||||||
|  | 
 | ||||||
|     if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { |     if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { | ||||||
|       // construct this block
 |       // construct this block
 | ||||||
|       if (transactions.length) { |       if (transactions.length) { | ||||||
|         blocks.push(transactions.map(t => t.uid)); |         blocks.push(transactions.map(t => t.uid)); | ||||||
|  |       } else { | ||||||
|  |         break; | ||||||
|       } |       } | ||||||
|       // reset for the next block
 |       // reset for the next block
 | ||||||
|       transactions = []; |       transactions = []; | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user