diff --git a/src/subspace.js b/src/subspace.js index 60b58ac..63e8229 100644 --- a/src/subspace.js +++ b/src/subspace.js @@ -1,4 +1,4 @@ -import { ReplaySubject, BehaviorSubject } from 'rxjs'; +import { ReplaySubject, BehaviorSubject, Subject } from 'rxjs'; import { distinctUntilChanged, map } from 'rxjs/operators'; import equal from 'fast-deep-equal'; import Database from './database/database.js'; @@ -10,6 +10,7 @@ import stripHexPrefix from 'strip-hex-prefix'; import {hexToDec} from 'hex2dec'; import EventSyncer from './eventSyncer'; import LogSyncer from './logSyncer'; +import hash from 'object-hash'; export default class Subspace { @@ -30,19 +31,13 @@ export default class Subspace { this.networkId = undefined; this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on; + // Stats + this.latestBlockNumber = undefined; + this.latestGasPrice = undefined; + this.latestBlock = undefined; + this.latest10Blocks = []; -// TODO: part of manager -this.latestBlockNumber = undefined; -this.latestGasPrice = undefined; -this.latestBlock = undefined; -this.latest10Blocks = []; - - -// TODO: part of manager -this.blockNumberObservable = null; -this.gasPriceObservable = null; -this.blockObservable = null; -this.blockTimeObservable = null; + this.subjects = {}; this.newBlocksSubscription = null; this.intervalTracker = null; @@ -77,7 +72,7 @@ this.blockTimeObservable = null; this.latest10Blocks = await Promise.all(this.latest10Blocks); } - // TODO: part of manager + // Initial stats this.latestBlockNumber = block.number; this.latestGasPrice = gasPrice; this.latestBlock = block; @@ -149,9 +144,13 @@ this.blockTimeObservable = null; } // TODO: get contract abi/address instead - trackEvent(contractInstance, eventName, filterConditionsOrCb) { + trackEvent(contractInstance, eventName, filterConditions) { + const subjectHash = hash({address: contractInstance.options.address, networkId: this.networkId, eventName, filterConditions}); + + if(this.subjects[subjectHash]) return this.subjects[subjectHash]; + let deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks; - let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, deleteFrom, this.networkId); + let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId); returnSub.map = (prop) => { return returnSub.pipe(map((x) => { @@ -168,6 +167,8 @@ this.blockTimeObservable = null; })) } + this.subjects[subjectHash] = returnSub; + return returnSub; } @@ -181,7 +182,14 @@ this.blockTimeObservable = null; trackLogs(options, inputsABI) { if(!this.isWebsocketProvider) console.warn("This method only works with websockets"); - return this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId); + + const subjectHash = hash({inputsABI, options}); + + if(this.subjects[subjectHash]) return this.subjects[subjectHash]; + + this.subjects[subjectHash] = this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId); + + return this.subjects[subjectHash]; } _initNewBlocksSubscription() { @@ -209,22 +217,26 @@ this.blockTimeObservable = null; }, this.options.callInterval); } - // TODO: should save value in database? trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) { - const sub = new ReplaySubject(); + const subjectHash = hash({address: contractInstance.options.address, networkId: this.networkId, propName, methodArgs, callArgs}); + + if(this.subjects[subjectHash]) return this.subjects[subjectHash]; + + const subject = new Subject(); if (!Array.isArray(methodArgs)) { methodArgs = [methodArgs] } - const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs) + const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs); + const callContractMethod = () => { method.call.apply(method.call, [callArgs, (err, result) => { if (err) { - sub.error(err); + subject.error(err); return; } - sub.next(result); + subject.next(result); }]); }; @@ -232,7 +244,7 @@ this.blockTimeObservable = null; this.callables.push(callContractMethod); - let returnSub = sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); + const returnSub = subject.pipe(distinctUntilChanged((a, b) => equal(a, b))); returnSub.map = (prop) => { return returnSub.pipe(map((x) => { @@ -249,16 +261,23 @@ this.blockTimeObservable = null; })) } + this.subjects[subjectHash] = returnSub; + return returnSub; } _addDistinctCallable(trackAttribute, cbBuilder, subject, subjectArg = undefined) { - if(this[trackAttribute]) return this[trackAttribute].pipe(distinctUntilChanged((a, b) => equal(a, b))); - this[trackAttribute] = new subject(subjectArg); - const cb = cbBuilder(this[trackAttribute]); + if(this.subjects[trackAttribute]) return this.subjects[trackAttribute]; + + const sub = new subject(subjectArg); + + const cb = cbBuilder(sub); cb(); this.callables.push(cb); - return this[trackAttribute].pipe(distinctUntilChanged((a, b) => equal(a, b))); + + this.subjects[trackAttribute] = sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); + + return this.subjects[trackAttribute]; } trackBlock() {