From 3c349e4ef779a61e4e82f07f9cee129c7269bb02 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sat, 15 Feb 2020 09:05:33 -0400 Subject: [PATCH] use observable to trigger tracking subscribers --- src/subspace.js | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/subspace.js b/src/subspace.js index 5640f9b..e356646 100644 --- a/src/subspace.js +++ b/src/subspace.js @@ -1,5 +1,5 @@ -import {BehaviorSubject, from} from "rxjs"; -import {distinctUntilChanged, map, exhaustMap, shareReplay} from "rxjs/operators"; +import {timer, from, Observable} from "rxjs"; +import {distinctUntilChanged, map, exhaustMap, share, shareReplay} from "rxjs/operators"; import equal from "fast-deep-equal"; import Database from "./database/database.js"; import NullDatabase from "./database/nullDatabase.js"; @@ -14,9 +14,7 @@ import hash from "object-hash"; export default class Subspace { observables = {}; - - newBlocksSubscription = null; - intervalTracker = null; + intervalObservable = null; // Stats latestBlockNumber = undefined; @@ -39,7 +37,6 @@ export default class Subspace { this.networkId = undefined; this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on; - this.triggerSubject = new BehaviorSubject(); } init() { @@ -147,20 +144,28 @@ export default class Subspace { } _initNewBlocksSubscription() { - if (this.newBlocksSubscription != null || this.options.callInterval !== 0) return; + if(this.options.callInterval !== 0) return; - this.newBlocksSubscription = this.web3.subscribe("newBlockHeaders", (err, result) => { - if (err) { - console.error(err); - return; - } - this.triggerSubject.next(); + const newBlockObservable = new Observable(observer => { + observer.next(); // initial tick; + + const newBlocksSubscription = this.web3.subscribe("newBlockHeaders", err => { + if (err) { + observer.error(err); + return; + } + observer.next(); + }); + + return () => newBlocksSubscription.unsubscribe(); }); + + this.intervalObservable = newBlockObservable.pipe(share()); } _initCallInterval() { - if (this.intervalTracker != null || this.options.callInterval === 0) return; - this.intervalTracker = setInterval(() => this.triggerSubject.next(), this.options.callInterval); + if (this.intervalObservable != null || this.options.callInterval === 0) return; + this.intervalObservable = timer(0, this.options.callInterval).pipe(share()); } _getObservable(subjectHash, observableBuilder) { @@ -171,7 +176,7 @@ export default class Subspace { _getDistinctObservableFromPromise(subjectName, promiseCB, cb) { return this._getObservable(subjectName, () => { - let observable = this.triggerSubject.pipe( + let observable = this.intervalObservable.pipe( exhaustMap(() => from(promiseCB())), distinctUntilChanged((a, b) => equal(a, b)) ); @@ -287,8 +292,8 @@ export default class Subspace { this.web3.getBalance(address).then(balance => console.log("Balance: ", balance)); return this.web3.getBalance(address); } else { - // balanceOf - const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address); + // balanceOf + const data = "0x70a08231000000000000000000000000" + stripHexPrefix(address); return new Promise((resolve, reject) => this.web3.call({to: erc20Address, data}).then(balance => resolve(hexToDec(balance))).catch(reject)); } }); @@ -331,9 +336,6 @@ export default class Subspace { } close() { - clearInterval(this.intervalTracker); - if (this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe(); this.eventSyncer.close(); - this.intervalTracker = null; } }