From 11a03e8773ed9240122a7430c9ec28e604606da9 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 1 Oct 2019 14:36:35 -0400 Subject: [PATCH] feat: survive reorgs --- src/database.js | 29 +++++++++++++++++------------ src/eventSyncer.js | 34 +++++++++++++++++++++++----------- src/logSyncer.js | 9 ++++++--- src/subspace.js | 11 +++++++++-- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/database.js b/src/database.js index 2e1ce90..96dfa6e 100644 --- a/src/database.js +++ b/src/database.js @@ -33,28 +33,21 @@ class Database { this.db = new loki(dbFilename, { autoload: true, autoloadCallback: () => { - this.databaseInitialize(cb) + this.databaseInitialize() }, autosave: true, env: getENV(), autosaveInterval: 2000 - }) + }); + this.events = events; } databaseInitialize(cb) { - let children = this.db.getCollection('children') - if (!children) { - children = this.db.addCollection('children') - this.db.saveDatabase() - } - let dbChanges = fromEvent(this.events, "updateDB") dbChanges.subscribe(() => { this.db.saveDatabase() }) - - cb(); } getLastKnownEvent(eventKey) { @@ -71,8 +64,8 @@ class Database { } return { - firstKnownBlock, - lastKnownBlock + firstKnownBlock: firstKnownBlock || 0, + lastKnownBlock: lastKnownBlock || 0 }; } @@ -92,6 +85,18 @@ class Database { children.insert(values); } + deleteEvent(eventKey, eventId) { + const collection = this.db.getCollection(eventKey); + if(collection) + collection.chain().find({ 'id': eventId }).remove(); + } + + deleteNewestBlocks(eventKey, gteBlockNum) { + const collection = this.db.getCollection(eventKey); + if(collection) + collection.chain().find({ 'blockNumber': {'$gte': gteBlockNum}}).remove(); + } + } export default Database; diff --git a/src/eventSyncer.js b/src/eventSyncer.js index 684b077..377b040 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -10,10 +10,12 @@ class EventSyncer { this.subscriptions = []; } - track(contractInstance, eventName, filterConditionsOrCb){ + track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum){ const isFilterFunction = typeof filterConditionsOrCb === 'function'; const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {})))); + this.db.deleteNewestBlocks(eventKey, gteBlockNum); + let filterConditions = {fromBlock: 0, toBlock: "latest"}; let filterConditionsCb; if (isFilterFunction) { @@ -29,17 +31,27 @@ class EventSyncer { contractObserver.subscribe((e) => { if(!e) return; + + const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}); // TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions const eventData = { - id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}), + id, returnValues: {...e.returnValues}, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, - logIndex: e.logIndex + logIndex: e.logIndex, + removed: e.removed } - sub.next({blockNumber: e.blockNumber, ...e.returnValues}); + // TODO: test reorgs + + sub.next(eventData); + + if(e.removed){ + this.db.deleteEvent(eventKey, id); + return; + } if (this.db.eventExists(eventKey, eventData.id)) return; @@ -49,12 +61,12 @@ class EventSyncer { }); const eth_subscribe = this._retrieveEvents(eventKey, - eventSummary.firstKnownBlock, - eventSummary.lastKnownBlock, - filterConditions, - filterConditionsCb, - contractInstance, - eventName); + eventSummary.firstKnownBlock, + eventSummary.lastKnownBlock, + filterConditions, + filterConditionsCb, + contractInstance, + eventName); const og_subscribe = sub.subscribe; sub.subscribe = (next, error, complete) => { @@ -146,7 +158,7 @@ class EventSyncer { console.error(err); return; } - + if (filterConditions) { let propsToFilter = []; for (let prop in filterConditions.filter) { diff --git a/src/logSyncer.js b/src/logSyncer.js index 359601c..41cf310 100644 --- a/src/logSyncer.js +++ b/src/logSyncer.js @@ -117,8 +117,12 @@ class LogSyncer { _getPastEvents(filterConditions, eventKey) { const cb = this._parseEventCBFactory(filterConditions, eventKey); this.web3.getPastLogs(options, (err, logs) => { + if(err) { + throw new Error(err); + } + logs.forEach(l => { - cb(err, l); + cb(null, l); }) }); } @@ -131,8 +135,7 @@ class LogSyncer { _parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => { if(err) { - console.error(err); - return; + throw new Error(err); } if (filterConditions) { diff --git a/src/subspace.js b/src/subspace.js index 27fefd4..0c79fae 100644 --- a/src/subspace.js +++ b/src/subspace.js @@ -22,8 +22,10 @@ export default class Subspace { this.web3 = new Web3Eth(provider); this.options = {}; + this.options.refreshLastNBlocks = options.refreshLastNBlocks || 12; this.options.callInterval = options.callInterval || 0; this.options.dbFilename = options.dbFilename || 'subspace.db'; + this.latestBlockNumber = undefined; this.newBlocksSubscription = null; this.intervalTracker = null; @@ -32,16 +34,21 @@ export default class Subspace { init() { return new Promise((resolve, reject) => { - this._db = new Database(this.options.dbFilename, this.events, resolve); + this._db = new Database(this.options.dbFilename, this.events); this.db = this._db.db; this.eventSyncer = new EventSyncer(this.web3, this.events, this._db); this.logSyncer = new LogSyncer(this.web3, this.events, this._db); + + this.web3.getBlock('latest').then(block => { + this.latestBlockNumber = block.number; + resolve(); + }) }) } // TODO: get contract abi/address instead trackEvent(contractInstance, eventName, filterConditionsOrCb) { - return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb); + return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, this.latestBlockNumber - this.options.refreshLastNBlocks); } clearDB(collection) {