Merge pull request #1772 from mempool/nymkappa/feature/websocket-block-count
Refactor pool ranking rxjs observable
This commit is contained in:
		
						commit
						91572ea864
					
				@ -1,5 +1,5 @@
 | 
			
		||||
import { Component, OnInit, ChangeDetectionStrategy, Input } from '@angular/core';
 | 
			
		||||
import { BehaviorSubject, combineLatest, Observable, timer } from 'rxjs';
 | 
			
		||||
import { BehaviorSubject, combineLatest, concat, Observable, timer } from 'rxjs';
 | 
			
		||||
import { delayWhen, map, retryWhen, scan, skip, switchMap, tap } from 'rxjs/operators';
 | 
			
		||||
import { BlockExtended } from 'src/app/interfaces/node-api.interface';
 | 
			
		||||
import { ApiService } from 'src/app/services/api.service';
 | 
			
		||||
@ -15,7 +15,7 @@ import { WebsocketService } from 'src/app/services/websocket.service';
 | 
			
		||||
export class BlocksList implements OnInit {
 | 
			
		||||
  @Input() widget: boolean = false;
 | 
			
		||||
 | 
			
		||||
  blocks$: Observable<any[]> = undefined;
 | 
			
		||||
  blocks$: Observable<BlockExtended[]> = undefined;
 | 
			
		||||
 | 
			
		||||
  indexingAvailable = false;
 | 
			
		||||
  isLoading = true;
 | 
			
		||||
@ -27,6 +27,7 @@ export class BlocksList implements OnInit {
 | 
			
		||||
  blocksCount: number;
 | 
			
		||||
  fromHeightSubject: BehaviorSubject<number> = new BehaviorSubject(this.fromBlockHeight);
 | 
			
		||||
  skeletonLines: number[] = [];
 | 
			
		||||
  lastBlockHeight = -1;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    private apiService: ApiService,
 | 
			
		||||
@ -57,6 +58,7 @@ export class BlocksList implements OnInit {
 | 
			
		||||
                  this.blocksCount = blocks[0].height + 1;
 | 
			
		||||
                }
 | 
			
		||||
                this.isLoading = false;
 | 
			
		||||
                this.lastBlockHeight = Math.max(...blocks.map(o => o.height))
 | 
			
		||||
              }),
 | 
			
		||||
              map(blocks => {
 | 
			
		||||
                if (this.indexingAvailable) {
 | 
			
		||||
@ -73,12 +75,18 @@ export class BlocksList implements OnInit {
 | 
			
		||||
              }),
 | 
			
		||||
              retryWhen(errors => errors.pipe(delayWhen(() => timer(10000))))
 | 
			
		||||
            )
 | 
			
		||||
          })
 | 
			
		||||
        })
 | 
			
		||||
      ),
 | 
			
		||||
      this.stateService.blocks$
 | 
			
		||||
        .pipe(
 | 
			
		||||
          skip(this.stateService.env.MEMPOOL_BLOCKS_AMOUNT - 1),
 | 
			
		||||
        ),
 | 
			
		||||
          switchMap((block) => {
 | 
			
		||||
            if (block[0].height <= this.lastBlockHeight) {
 | 
			
		||||
              return []; // Return an empty stream so the last pipe is not executed
 | 
			
		||||
            }
 | 
			
		||||
            this.lastBlockHeight = block[0].height;
 | 
			
		||||
            return [block];
 | 
			
		||||
          })
 | 
			
		||||
        )
 | 
			
		||||
    ])
 | 
			
		||||
      .pipe(
 | 
			
		||||
        scan((acc, blocks) => {
 | 
			
		||||
 | 
			
		||||
@ -2,8 +2,8 @@ import { ChangeDetectionStrategy, Component, Input, NgZone, OnInit, HostBinding
 | 
			
		||||
import { FormBuilder, FormGroup } from '@angular/forms';
 | 
			
		||||
import { Router } from '@angular/router';
 | 
			
		||||
import { EChartsOption, PieSeriesOption } from 'echarts';
 | 
			
		||||
import { combineLatest, Observable, of } from 'rxjs';
 | 
			
		||||
import { catchError, map, share, skip, startWith, switchMap, tap } from 'rxjs/operators';
 | 
			
		||||
import { concat, Observable } from 'rxjs';
 | 
			
		||||
import { map, share, startWith, switchMap, tap } from 'rxjs/operators';
 | 
			
		||||
import { SinglePoolStats } from 'src/app/interfaces/node-api.interface';
 | 
			
		||||
import { SeoService } from 'src/app/services/seo.service';
 | 
			
		||||
import { StorageService } from '../..//services/storage.service';
 | 
			
		||||
@ -58,15 +58,7 @@ export class PoolRankingComponent implements OnInit {
 | 
			
		||||
    this.radioGroupForm = this.formBuilder.group({ dateSpan: this.miningWindowPreference });
 | 
			
		||||
    this.radioGroupForm.controls.dateSpan.setValue(this.miningWindowPreference);
 | 
			
		||||
 | 
			
		||||
    // When...
 | 
			
		||||
    this.miningStatsObservable$ = combineLatest([
 | 
			
		||||
      // ...a new block is mined
 | 
			
		||||
      this.stateService.blocks$
 | 
			
		||||
        .pipe(
 | 
			
		||||
          // (we always receives some blocks at start so only trigger for the last one)
 | 
			
		||||
          skip(this.stateService.env.MEMPOOL_BLOCKS_AMOUNT - 1),
 | 
			
		||||
        ),
 | 
			
		||||
      // ...or we change the timespan
 | 
			
		||||
    this.miningStatsObservable$ = concat(
 | 
			
		||||
      this.radioGroupForm.get('dateSpan').valueChanges
 | 
			
		||||
        .pipe(
 | 
			
		||||
          startWith(this.miningWindowPreference), // (trigger when the page loads)
 | 
			
		||||
@ -76,18 +68,19 @@ export class PoolRankingComponent implements OnInit {
 | 
			
		||||
              this.storageService.setValue('miningWindowPreference', value);
 | 
			
		||||
            }
 | 
			
		||||
            this.miningWindowPreference = value;
 | 
			
		||||
          }),
 | 
			
		||||
          switchMap(() => {
 | 
			
		||||
            return this.miningService.getMiningStats(this.miningWindowPreference);
 | 
			
		||||
          })
 | 
			
		||||
        )
 | 
			
		||||
    ])
 | 
			
		||||
      // ...then refresh the mining stats
 | 
			
		||||
        ),
 | 
			
		||||
        this.stateService.blocks$
 | 
			
		||||
          .pipe(
 | 
			
		||||
            switchMap(() => {
 | 
			
		||||
              return this.miningService.getMiningStats(this.miningWindowPreference);
 | 
			
		||||
            })
 | 
			
		||||
          )
 | 
			
		||||
      )
 | 
			
		||||
      .pipe(
 | 
			
		||||
        switchMap(() => {
 | 
			
		||||
          this.isLoading = true;
 | 
			
		||||
          return this.miningService.getMiningStats(this.miningWindowPreference)
 | 
			
		||||
            .pipe(
 | 
			
		||||
              catchError((e) => of(this.getEmptyMiningStat()))
 | 
			
		||||
            );
 | 
			
		||||
        }),
 | 
			
		||||
        map(data => {
 | 
			
		||||
          data.pools = data.pools.map((pool: SinglePoolStats) => this.formatPoolUI(pool));
 | 
			
		||||
          data['minersLuck'] = (100 * (data.blockCount / 1008)).toFixed(2); // luck 1w
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user