feat: use subject dictionary to avoid creating multiple duplicated subscriptions (#67)

* feat: keep track of props subjects
* Keep track of subjects for event and log trackers
This commit is contained in:
Richard Ramos 2020-02-11 09:39:37 -04:00 committed by GitHub
parent 556bf5191e
commit 81e09f1a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 27 deletions

View File

@ -1,4 +1,4 @@
import { ReplaySubject, BehaviorSubject } from 'rxjs';
import { ReplaySubject, BehaviorSubject, Subject } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
import equal from 'fast-deep-equal';
import Database from './database/database.js';
@ -10,6 +10,7 @@ import stripHexPrefix from 'strip-hex-prefix';
import {hexToDec} from 'hex2dec';
import EventSyncer from './eventSyncer';
import LogSyncer from './logSyncer';
import hash from 'object-hash';
export default class Subspace {
@ -30,19 +31,13 @@ export default class Subspace {
this.networkId = undefined;
this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on;
// Stats
this.latestBlockNumber = undefined;
this.latestGasPrice = undefined;
this.latestBlock = undefined;
this.latest10Blocks = [];
// TODO: part of manager
this.latestBlockNumber = undefined;
this.latestGasPrice = undefined;
this.latestBlock = undefined;
this.latest10Blocks = [];
// TODO: part of manager
this.blockNumberObservable = null;
this.gasPriceObservable = null;
this.blockObservable = null;
this.blockTimeObservable = null;
this.subjects = {};
this.newBlocksSubscription = null;
this.intervalTracker = null;
@ -77,7 +72,7 @@ this.blockTimeObservable = null;
this.latest10Blocks = await Promise.all(this.latest10Blocks);
}
// TODO: part of manager
// Initial stats
this.latestBlockNumber = block.number;
this.latestGasPrice = gasPrice;
this.latestBlock = block;
@ -149,9 +144,13 @@ this.blockTimeObservable = null;
}
// TODO: get contract abi/address instead
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
trackEvent(contractInstance, eventName, filterConditions) {
const subjectHash = hash({address: contractInstance.options.address, networkId: this.networkId, eventName, filterConditions});
if(this.subjects[subjectHash]) return this.subjects[subjectHash];
let deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks;
let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, deleteFrom, this.networkId);
let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId);
returnSub.map = (prop) => {
return returnSub.pipe(map((x) => {
@ -168,6 +167,8 @@ this.blockTimeObservable = null;
}))
}
this.subjects[subjectHash] = returnSub;
return returnSub;
}
@ -181,7 +182,14 @@ this.blockTimeObservable = null;
trackLogs(options, inputsABI) {
if(!this.isWebsocketProvider) console.warn("This method only works with websockets");
return this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId);
const subjectHash = hash({inputsABI, options});
if(this.subjects[subjectHash]) return this.subjects[subjectHash];
this.subjects[subjectHash] = this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId);
return this.subjects[subjectHash];
}
_initNewBlocksSubscription() {
@ -209,22 +217,26 @@ this.blockTimeObservable = null;
}, this.options.callInterval);
}
// TODO: should save value in database?
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
const sub = new ReplaySubject();
const subjectHash = hash({address: contractInstance.options.address, networkId: this.networkId, propName, methodArgs, callArgs});
if(this.subjects[subjectHash]) return this.subjects[subjectHash];
const subject = new Subject();
if (!Array.isArray(methodArgs)) {
methodArgs = [methodArgs]
}
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs)
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs);
const callContractMethod = () => {
method.call.apply(method.call, [callArgs, (err, result) => {
if (err) {
sub.error(err);
subject.error(err);
return;
}
sub.next(result);
subject.next(result);
}]);
};
@ -232,7 +244,7 @@ this.blockTimeObservable = null;
this.callables.push(callContractMethod);
let returnSub = sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
const returnSub = subject.pipe(distinctUntilChanged((a, b) => equal(a, b)));
returnSub.map = (prop) => {
return returnSub.pipe(map((x) => {
@ -249,16 +261,23 @@ this.blockTimeObservable = null;
}))
}
this.subjects[subjectHash] = returnSub;
return returnSub;
}
_addDistinctCallable(trackAttribute, cbBuilder, subject, subjectArg = undefined) {
if(this[trackAttribute]) return this[trackAttribute].pipe(distinctUntilChanged((a, b) => equal(a, b)));
this[trackAttribute] = new subject(subjectArg);
const cb = cbBuilder(this[trackAttribute]);
if(this.subjects[trackAttribute]) return this.subjects[trackAttribute];
const sub = new subject(subjectArg);
const cb = cbBuilder(sub);
cb();
this.callables.push(cb);
return this[trackAttribute].pipe(distinctUntilChanged((a, b) => equal(a, b)));
this.subjects[trackAttribute] = sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
return this.subjects[trackAttribute];
}
trackBlock() {