diff --git a/src/eventSyncer.js b/src/eventSyncer.js index b87e98a..f9b746f 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -67,7 +67,7 @@ class EventSyncer { if (this.isWebsocketProvider) { const fnSubscribe = this.subscribeToEvent(eventKey, contractInstance, eventName); - const eth_subscribe = this.eventScanner.scan( + const ethSubscription = this.eventScanner.scan( fnDBEvents, fnPastEvents, fnSubscribe, @@ -75,25 +75,11 @@ class EventSyncer { lastKnownBlock, filterConditions ); - - const og_subscribe = sub.subscribe; - sub.subscribe = async (next, error, complete) => { - const s = og_subscribe.apply(sub, [next, error, complete]); - s.add(() => { - // Removing web3js subscription when rxJS unsubscribe is executed - eth_subscribe.then(susc => { - if (susc) { - susc.unsubscribe(); - } - }); - }); - return s; - }; + return [sub, ethSubscription]; } else { this.eventScanner.scan(fnDBEvents, fnPastEvents, lastKnownBlock, filterConditions); + return [sub, undefined]; } - - return sub; } getPastEvents = (eventKey, contractInstance, eventName, filters) => async (fromBlock, toBlock, hardLimit) => { diff --git a/src/logSyncer.js b/src/logSyncer.js index b2e50a5..bfb634b 100644 --- a/src/logSyncer.js +++ b/src/logSyncer.js @@ -67,24 +67,14 @@ class LogSyncer { this.events.emit("updateDB"); }); - const eth_subscribe = this._retrieveEvents( + const ethSubscription = this._retrieveEvents( eventKey, eventSummary.firstKnownBlock, eventSummary.lastKnownBlock, filterConditions ); - const og_subscribe = sub.subscribe; - sub.subscribe = (next, error, complete) => { - const s = og_subscribe.apply(sub, [next, error, complete]); - s.add(() => { - // Removing web3js subscription when rxJS unsubscribe is executed - if (eth_subscribe) eth_subscribe.unsubscribe(); - }); - return s; - }; - - return sub; + return [sub, ethSubscription]; } _retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) { diff --git a/src/subspace.js b/src/subspace.js index 0248032..aaf43b8 100644 --- a/src/subspace.js +++ b/src/subspace.js @@ -1,5 +1,5 @@ -import {ReplaySubject, BehaviorSubject} from "rxjs"; -import {distinctUntilChanged, map} from "rxjs/operators"; +import {BehaviorSubject, from} from "rxjs"; +import {distinctUntilChanged, map, exhaustMap, shareReplay} from "rxjs/operators"; import equal from "fast-deep-equal"; import Database from "./database/database.js"; import NullDatabase from "./database/nullDatabase.js"; @@ -13,8 +13,7 @@ import LogSyncer from "./logSyncer"; import hash from "object-hash"; export default class Subspace { - subjects = {}; - callables = []; + observables = {}; newBlocksSubscription = null; intervalTracker = null; @@ -40,6 +39,7 @@ export default class Subspace { this.networkId = undefined; this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on; + this.triggerSubject = new BehaviorSubject(); } init() { @@ -154,32 +154,36 @@ export default class Subspace { console.error(err); return; } - - this.callables.forEach(fn => fn()); + this.triggerSubject.next(); }); } _initCallInterval() { if (this.intervalTracker != null || this.options.callInterval === 0) return; - - this.intervalTracker = setInterval(() => { - this.callables.forEach(fn => fn()); - }, this.options.callInterval); + this.intervalTracker = setInterval(() => this.triggerSubject.next(), this.options.callInterval); } - _getSubject(subjectHash, subjectCB) { - if (this.subjects[subjectHash]) return this.subjects[subjectHash]; - this.subjects[subjectHash] = subjectCB(); - return this.subjects[subjectHash]; + _getObservable(subjectHash, observableBuilder) { + if (this.observables[subjectHash]) return this.observables[subjectHash]; + this.observables[subjectHash] = observableBuilder(); + return this.observables[subjectHash]; } - _addDistinctCallable(trackAttribute, cbBuilder, SubjectType, subjectArg = undefined) { - return this._getSubject(trackAttribute, () => { - const sub = new SubjectType(subjectArg); - const cb = cbBuilder(sub); - cb(); - this.callables.push(cb); - return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); + _getDistinctObservableFromPromise(subjectName, promiseCB, cb) { + return this._getObservable(subjectName, () => { + let observable = this.triggerSubject.pipe( + exhaustMap(() => from(promiseCB())), + distinctUntilChanged((a, b) => equal(a, b)) + ); + + if(cb){ + observable = observable.pipe(map(x => { + cb(x); + return x; + })); + } + + return observable.pipe(shareReplay({refCount: true, bufferSize: 1})); }); } @@ -190,11 +194,14 @@ export default class Subspace { eventName, filterConditions }); - return this._getSubject(subjectHash, () => { - let deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks; - let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId); - returnSub.map = prop => { + return this._getObservable(subjectHash, () => { + const deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks; + const [subject, ethSubscription] = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId); + + // TODO: remove eth subscription + + subject.map = prop => { return returnSub.pipe( map(x => { if (typeof prop === "string") { @@ -211,7 +218,7 @@ export default class Subspace { ); }; - return returnSub; + return subject; }); } @@ -219,13 +226,20 @@ export default class Subspace { if (!this.isWebsocketProvider) console.warn("This method only works with websockets"); const subjectHash = hash({inputsABI, options}); - return this._getSubject(subjectHash, () => - this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId) - ); + return this._getObservable(subjectHash, () => { + const [subject, ethSubscription] = this.logSyncer.track( + options, + inputsABI, + this.latestBlockNumber - this.options.refreshLastNBlocks, + this.networkId + ); + // TODO: remove eth subscription + return subject; + }); } trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) { - const subjectHash = hash({ + const identifier = hash({ address: contractInstance.options.address, networkId: this.networkId, propName, @@ -233,156 +247,86 @@ export default class Subspace { callArgs }); - return this._getSubject(subjectHash, () => { - const subject = new ReplaySubject(1); - + const observable = this._getDistinctObservableFromPromise(identifier, () => { if (!Array.isArray(methodArgs)) { methodArgs = [methodArgs]; } - const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs); - - const callContractMethod = () => { - method.call.apply(method.call, [ - callArgs, - (err, result) => { - if (err) { - subject.error(err); - return; - } - subject.next(result); - } - ]); - }; - - callContractMethod(); - - this.callables.push(callContractMethod); - - const returnSub = subject.pipe(distinctUntilChanged((a, b) => equal(a, b))); - - returnSub.map = prop => { - return returnSub.pipe( - map(x => { - if (typeof prop === "string") { - return x[prop]; - } - if (Array.isArray(prop)) { - let newValues = {}; - prop.forEach(p => { - newValues[p] = x[p]; - }); - return newValues; - } - }) - ); - }; - - return returnSub; + return method.call.apply(method.call, [callArgs]); }); + + observable.map = prop => { + return observable.pipe( + map(x => { + if (typeof prop === "string") { + return x[prop]; + } + if (Array.isArray(prop)) { + let newValues = {}; + prop.forEach(p => { + newValues[p] = x[p]; + }); + return newValues; + } + }) + ); + }; + + return observable; } trackBalance(address, erc20Address) { if (!isAddress(address)) throw "invalid address"; if (erc20Address && !isAddress(erc20Address)) throw "invalid ERC20 contract address"; - const subjectHash = hash({address, erc20Address}); - - const getETHBalance = cb => { - const fn = this.web3.getBalance; - fn.apply(fn, [address, cb]); - }; - - const getTokenBalance = cb => { - const fn = this.web3.call; - // balanceOf - const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address); - fn.apply(fn, [{to: erc20Address, data}, cb]); - }; - - let callFn; - if (!erc20Address) { - callFn = subject => () => - getETHBalance((err, balance) => { - if (err) { - subject.error(err); - return; - } - subject.next(balance); - }); - } else { - callFn = subject => () => - getTokenBalance((err, balance) => { - if (err) { - subject.error(err); - return; - } - subject.next(hexToDec(balance)); - }); - } - - return this._addDistinctCallable(subjectHash, callFn, ReplaySubject, 1); - } - - trackBlock() { - const blockCB = subject => () => { - this.web3 - .getBlock("latest") - .then(block => { - if (this.latest10Blocks[this.latest10Blocks.length - 1].number === block.number) return; - - this.latest10Blocks.push(block); - if (this.latest10Blocks.length > 10) { - this.latest10Blocks.shift(); - } - subject.next(block); - }) - .catch(error => subject.error(error)); - }; - - return this._addDistinctCallable( - "blockObservable", - blockCB, - BehaviorSubject, - this.latest10Blocks[this.latest10Blocks.length - 1] - ); + address = toChecksumAddress(address); + erc20Address = toChecksumAddress(address); + + return this._getDistinctObservableFromPromise(hash({address, erc20Address}), () => { + if (!erc20Address) { + return this.web3.getBalance(address); + } else { + // balanceOf + const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address); + return new Promise((resolve, reject) => this.web3.call({to: erc20Address, data}).then(balance => resolve(hexToDec(balance))).catch(reject)); + } + }); } trackBlockNumber() { - const blockNumberCB = subject => () => { - this.web3 - .getBlockNumber() - .then(result => subject.next(result)) - .catch(error => subject.error(error)); - }; - return this._addDistinctCallable("blockNumberObservable", blockNumberCB, ReplaySubject, 1); + return this._getDistinctObservableFromPromise("blockNumber", () => this.web3.getBlockNumber()); + } + + trackBlock() { + return this._getDistinctObservableFromPromise("gasPrice", () => this.web3.getBlock("latest"), block => { + if (this.latest10Blocks[this.latest10Blocks.length - 1].number === block.number) return; + this.latest10Blocks.push(block); + if (this.latest10Blocks.length > 10) { + this.latest10Blocks.shift(); + } + }); } trackGasPrice() { - const gasPriceCB = subject => () => { - this.web3 - .getGasPrice() - .then(result => subject.next(result)) - .catch(error => subject.error(error)); - }; - return this._addDistinctCallable("gasPriceObservable", gasPriceCB, ReplaySubject, 1); + return this._getDistinctObservableFromPromise("gasPrice", () => this.web3.getGasPrice()); } trackAverageBlocktime() { - this.trackBlock(); - - const calcAverage = () => { - const times = []; - for (let i = 1; i < this.latest10Blocks.length; i++) { - let time = this.latest10Blocks[i].timestamp - this.latest10Blocks[i - 1].timestamp; - times.push(time); - } - return times.length ? Math.round(times.reduce((a, b) => a + b) / times.length) * 1000 : 0; - }; - - const avgTimeCB = subject => () => subject.next(calcAverage()); - - return this._addDistinctCallable("blockTimeObservable", avgTimeCB, BehaviorSubject, calcAverage()); + return this._getObservable("avgBlockTime", () => { + const calcAverage = () => { + const times = []; + for (let i = 1; i < this.latest10Blocks.length; i++) { + let time = this.latest10Blocks[i].timestamp - this.latest10Blocks[i - 1].timestamp; + times.push(time); + } + return times.length ? Math.round(times.reduce((a, b) => a + b) / times.length) * 1000 : 0; + }; + + return this.trackBlock().pipe( + map(() => calcAverage()), + distinctUntilChanged((a, b) => equal(a, b)) + ); + }); } close() { @@ -390,6 +334,5 @@ export default class Subspace { if (this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe(); this.eventSyncer.close(); this.intervalTracker = null; - this.callables = []; } } diff --git a/src/utils.js b/src/utils.js index 25f7fbf..d9440a3 100644 --- a/src/utils.js +++ b/src/utils.js @@ -2,6 +2,24 @@ export function isAddress(address) { return /^(0x)?[0-9a-fA-F]{40}$/i.test(address); } +export function toChecksumAddress(address) { + if (typeof address === "undefined") return ""; + + address = address.toLowerCase().replace(/^0x/i, ""); + var addressHash = utils.sha3(address).replace(/^0x/i, ""); + var checksumAddress = "0x"; + + for (var i = 0; i < address.length; i++) { + // If ith character is 9 to f then make it uppercase + if (parseInt(addressHash[i], 16) > 7) { + checksumAddress += address[i].toUpperCase(); + } else { + checksumAddress += address[i]; + } + } + return checksumAddress; +} + export function sleep(milliseconds) { return new Promise(resolve => setTimeout(resolve, milliseconds)); }