use observable to trigger tracking subscribers

This commit is contained in:
Richard Ramos 2020-02-15 09:05:33 -04:00
parent d3284cb1e5
commit 3c349e4ef7
1 changed files with 23 additions and 21 deletions

View File

@ -1,5 +1,5 @@
import {BehaviorSubject, from} from "rxjs";
import {distinctUntilChanged, map, exhaustMap, shareReplay} from "rxjs/operators";
import {timer, from, Observable} from "rxjs";
import {distinctUntilChanged, map, exhaustMap, share, shareReplay} from "rxjs/operators";
import equal from "fast-deep-equal";
import Database from "./database/database.js";
import NullDatabase from "./database/nullDatabase.js";
@ -14,9 +14,7 @@ import hash from "object-hash";
export default class Subspace {
observables = {};
newBlocksSubscription = null;
intervalTracker = null;
intervalObservable = null;
// Stats
latestBlockNumber = undefined;
@ -39,7 +37,6 @@ export default class Subspace {
this.networkId = undefined;
this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on;
this.triggerSubject = new BehaviorSubject();
}
init() {
@ -147,20 +144,28 @@ export default class Subspace {
}
_initNewBlocksSubscription() {
if (this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
if(this.options.callInterval !== 0) return;
this.newBlocksSubscription = this.web3.subscribe("newBlockHeaders", (err, result) => {
if (err) {
console.error(err);
return;
}
this.triggerSubject.next();
const newBlockObservable = new Observable(observer => {
observer.next(); // initial tick;
const newBlocksSubscription = this.web3.subscribe("newBlockHeaders", err => {
if (err) {
observer.error(err);
return;
}
observer.next();
});
return () => newBlocksSubscription.unsubscribe();
});
this.intervalObservable = newBlockObservable.pipe(share());
}
_initCallInterval() {
if (this.intervalTracker != null || this.options.callInterval === 0) return;
this.intervalTracker = setInterval(() => this.triggerSubject.next(), this.options.callInterval);
if (this.intervalObservable != null || this.options.callInterval === 0) return;
this.intervalObservable = timer(0, this.options.callInterval).pipe(share());
}
_getObservable(subjectHash, observableBuilder) {
@ -171,7 +176,7 @@ export default class Subspace {
_getDistinctObservableFromPromise(subjectName, promiseCB, cb) {
return this._getObservable(subjectName, () => {
let observable = this.triggerSubject.pipe(
let observable = this.intervalObservable.pipe(
exhaustMap(() => from(promiseCB())),
distinctUntilChanged((a, b) => equal(a, b))
);
@ -287,8 +292,8 @@ export default class Subspace {
this.web3.getBalance(address).then(balance => console.log("Balance: ", balance));
return this.web3.getBalance(address);
} else {
// balanceOf
const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address);
// balanceOf
const data = "0x70a08231000000000000000000000000" + stripHexPrefix(address);
return new Promise((resolve, reject) => this.web3.call({to: erc20Address, data}).then(balance => resolve(hexToDec(balance))).catch(reject));
}
});
@ -331,9 +336,6 @@ export default class Subspace {
}
close() {
clearInterval(this.intervalTracker);
if (this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe();
this.eventSyncer.close();
this.intervalTracker = null;
}
}