From 454cfdecfd6b8c9869701d949b9a3372fdd7b823 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 8 Sep 2019 21:40:54 -0400 Subject: [PATCH] refactor: extract event tracking to file (#30) --- src/eventSyncer.js | 189 +++++++-------------------------------------- src/index.js | 2 +- src/subspace.js | 142 ++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 160 deletions(-) create mode 100644 src/subspace.js diff --git a/src/eventSyncer.js b/src/eventSyncer.js index aee4482..38672b1 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -1,57 +1,34 @@ import { fromEvent, ReplaySubject } from 'rxjs'; -import { distinctUntilChanged } from 'rxjs/operators'; -import equal from 'fast-deep-equal'; import hash from 'object-hash'; -import Database from './database.js'; -import Events from 'events'; -import Web3Eth from 'web3-eth'; -import stripHexPrefix from 'strip-hex-prefix'; -import toBN from 'number-to-bn'; -export default class EventSyncer { - - constructor(provider, options = {}) { - this.events = new Events(); - this.web3 = new Web3Eth(provider); - - this.options = {}; - this.options.callInterval = options.callInterval || 0; - this.options.dbFilename = options.dbFilename || 'phoenix.db'; - - this.newBlocksSubscription = null; - this.intervalTracker = null; - this.callables = []; +class EventSyncer { + constructor(web3, events, db) { + this.events = events; + this.web3 = web3; + this.db = db; } - init() { - return new Promise((resolve, reject) => { - this._db = new Database(this.options.dbFilename, this.events, resolve); - this.db = this._db.db; - }) - } - - // TODO: get contract abi/address instead - trackEvent(contractInstance, eventName, filterConditionsOrCb) { - let eventKey = eventName + '-' + hash(filterConditionsOrCb); + track(contractInstance, eventName, filterConditionsOrCb){ + const isFilterFunction = typeof filterConditionsOrCb === 'function'; + const eventKey = eventName + '-' + hash(isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {})); let filterConditions = {fromBlock: 0, toBlock: "latest"}; let filterConditionsCb; - if (typeof filterConditionsOrCb === 'function') { + if (isFilterFunction) { filterConditionsCb = filterConditionsOrCb; } else { filterConditions = Object.assign(filterConditions, filterConditionsOrCb || {}); } - - let eventSummary = this._db.getLastKnownEvent(eventKey); + + let eventSummary = this.db.getLastKnownEvent(eventKey); let sub = new ReplaySubject(); let contractObserver = fromEvent(this.events, eventKey) contractObserver.subscribe((e) => { if(!e) return; - - // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions - + + // TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions const eventData = { id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}), returnValues: {...e.returnValues}, @@ -62,9 +39,9 @@ export default class EventSyncer { sub.next({blockNumber: e.blockNumber, ...e.returnValues}); - if (this._db.eventExists(eventKey, eventData.id)) return; + if (this.db.eventExists(eventKey, eventData.id)) return; - this._db.recordEvent(eventKey, eventData); + this.db.recordEvent(eventKey, eventData); this.events.emit("updateDB"); }); @@ -80,12 +57,11 @@ export default class EventSyncer { return sub; } - _retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) { // TODO: this should be moved to a 'smart' module // it should be able to do events X at the time to avoid slow downs as well as the 10k limit // TODO: filter subscriptions with fromBlock and toBlock - + if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) { if (filterConditions.toBlock === 'latest') { // emit DB Events [fromBlock, lastKnownBlock] @@ -131,32 +107,31 @@ export default class EventSyncer { } } } - + _serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) { const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey); - const storedEvents = this._db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock); + const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock); storedEvents.forEach(ev => { cb(null, ev); }); } - + _getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) { const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey); contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => { - events.forEach(ev => { - cb(err, ev); - }); - } ]); + events.forEach(ev => { + cb(err, ev); + }); + }]); } - + _subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) { - event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]); + event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]); } - - + _parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => { if(err) return; - + if (filterConditions) { let propsToFilter = []; for (let prop in filterConditions.filter) { @@ -165,118 +140,14 @@ export default class EventSyncer { } } for (let prop of propsToFilter) { - if (filterConditions.filter[prop] !== ev.returnValues[prop]) - return; + if (filterConditions.filter[prop] !== ev.returnValues[prop]) return; } } else if (filterConditionsCb && !filterConditionsCb(ev.returnValues)) { return; } - this.events.emit(eventKey, ev); } +} - _initNewBlocksSubscription() { - if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return; - - this.newBlocksSubscription = this.web3.subscribe('newBlockHeaders', (err, result) => { - if(err) { - sub.error(err); - return; - } - - this.callables.forEach(fn => { - fn(); - }); - }); - } - - _initCallInterval() { - if(this.intervalTracker != null || this.options.callInterval === 0) return; - - this.intervalTracker = setInterval(() => { - this.callables.forEach(fn => { - fn(); - }); - }, this.options.callInterval); - - } - - // TODO: should save value in database? - trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) { - - const sub = new ReplaySubject(); - - const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs) - const callContractMethod = () => { - method.call.apply(method.call, [callArgs, (err, result) => { - if(err) { - sub.error(err); - return; - } - sub.next(result); - }]); - }; - - callContractMethod(); - - this._initNewBlocksSubscription(); - this._initCallInterval(); - - this.callables.push(callContractMethod); - - return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); - } - - // TODO: should save value in database? - trackBalance(address, erc20Address) { - const sub = new ReplaySubject(); - - // TODO: validate address? - - let callFn; - if(!erc20Address){ - callFn = () => { - const fn = this.web3.getBalance; - - fn.apply(fn, [address, (err, balance) => { - if(err) { - sub.error(err); - return; - } - sub.next(balance); - }]); - }; - } else { - callFn = () => { - const fn = this.web3.call; - // balanceOf - const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(erc20Address); - fn.apply(fn, [{to: erc20Address, data}, (err, result) => { - if(err) { - sub.error(err); - return; - } - sub.next(toBN(result).toString(10)); - }]); - }; - } - - callFn(); - - this._initNewBlocksSubscription(); - this._initCallInterval(); - - this.callables.push(callFn); - - return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); - } - - clean(){ - clearInterval(this.intervalTracker); - this.newBlocksSubscription.unsubscribe(); - this.intervalTracker = null; - this.callables = []; - } - -} \ No newline at end of file +export default EventSyncer; diff --git a/src/index.js b/src/index.js index 63ce4fd..a1a765b 100644 --- a/src/index.js +++ b/src/index.js @@ -1,2 +1,2 @@ -export {default} from './eventSyncer.js'; +export {default} from './subspace'; export * from './operators'; diff --git a/src/subspace.js b/src/subspace.js new file mode 100644 index 0000000..21a2d12 --- /dev/null +++ b/src/subspace.js @@ -0,0 +1,142 @@ +import { ReplaySubject } from 'rxjs'; +import { distinctUntilChanged } from 'rxjs/operators'; +import equal from 'fast-deep-equal'; +import Database from './database.js'; +import Events from 'events'; +import Web3Eth from 'web3-eth'; +import stripHexPrefix from 'strip-hex-prefix'; +import toBN from 'number-to-bn'; +import EventSyncer from './eventSyncer'; + +export default class Subspace { + + constructor(provider, options = {}) { + this.events = new Events(); + this.web3 = new Web3Eth(provider); + + this.options = {}; + this.options.callInterval = options.callInterval || 0; + this.options.dbFilename = options.dbFilename || 'phoenix.db'; + + this.newBlocksSubscription = null; + this.intervalTracker = null; + this.callables = []; + } + + init() { + return new Promise((resolve, reject) => { + this._db = new Database(this.options.dbFilename, this.events, resolve); + this.db = this._db.db; + }) + } + + // TODO: get contract abi/address instead + trackEvent(contractInstance, eventName, filterConditionsOrCb) { + const eventSyncer = new EventSyncer(this.web3, this.events, this._db); + return eventSyncer.track(contractInstance, eventName, filterConditionsOrCb); + } + + _initNewBlocksSubscription() { + if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return; + + this.newBlocksSubscription = this.web3.subscribe('newBlockHeaders', (err, result) => { + if(err) { + sub.error(err); + return; + } + + this.callables.forEach(fn => { + fn(); + }); + }); + } + + _initCallInterval() { + if(this.intervalTracker != null || this.options.callInterval === 0) return; + + this.intervalTracker = setInterval(() => { + this.callables.forEach(fn => { + fn(); + }); + }, this.options.callInterval); + + } + + // TODO: should save value in database? + trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) { + + const sub = new ReplaySubject(); + + const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs) + const callContractMethod = () => { + method.call.apply(method.call, [callArgs, (err, result) => { + if(err) { + sub.error(err); + return; + } + sub.next(result); + }]); + }; + + callContractMethod(); + + this._initNewBlocksSubscription(); + this._initCallInterval(); + + this.callables.push(callContractMethod); + + return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); + } + + // TODO: should save value in database? + trackBalance(address, erc20Address) { + const sub = new ReplaySubject(); + + // TODO: validate address? + + let callFn; + if(!erc20Address){ + callFn = () => { + const fn = this.web3.getBalance; + + fn.apply(fn, [address, (err, balance) => { + if(err) { + sub.error(err); + return; + } + sub.next(balance); + }]); + }; + } else { + callFn = () => { + const fn = this.web3.call; + // balanceOf + const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(erc20Address); + fn.apply(fn, [{to: erc20Address, data}, (err, result) => { + if(err) { + sub.error(err); + return; + } + sub.next(toBN(result).toString(10)); + }]); + }; + } + + callFn(); + + this._initNewBlocksSubscription(); + this._initCallInterval(); + + this.callables.push(callFn); + + return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); + } + + clean(){ + clearInterval(this.intervalTracker); + this.newBlocksSubscription.unsubscribe(); + this.intervalTracker = null; + this.callables = []; + } + +} \ No newline at end of file