Merge pull request #35 from status-im/feat--survive-reorgs

feat: survive reorgs
This commit is contained in:
Iuri Matias 2019-10-01 14:38:41 -04:00 committed by GitHub
commit 368e4a083f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 28 deletions

View File

@ -33,28 +33,21 @@ class Database {
this.db = new loki(dbFilename, { this.db = new loki(dbFilename, {
autoload: true, autoload: true,
autoloadCallback: () => { autoloadCallback: () => {
this.databaseInitialize(cb) this.databaseInitialize()
}, },
autosave: true, autosave: true,
env: getENV(), env: getENV(),
autosaveInterval: 2000 autosaveInterval: 2000
}) });
this.events = events; this.events = events;
} }
databaseInitialize(cb) { databaseInitialize(cb) {
let children = this.db.getCollection('children')
if (!children) {
children = this.db.addCollection('children')
this.db.saveDatabase()
}
let dbChanges = fromEvent(this.events, "updateDB") let dbChanges = fromEvent(this.events, "updateDB")
dbChanges.subscribe(() => { dbChanges.subscribe(() => {
this.db.saveDatabase() this.db.saveDatabase()
}) })
cb();
} }
getLastKnownEvent(eventKey) { getLastKnownEvent(eventKey) {
@ -71,8 +64,8 @@ class Database {
} }
return { return {
firstKnownBlock, firstKnownBlock: firstKnownBlock || 0,
lastKnownBlock lastKnownBlock: lastKnownBlock || 0
}; };
} }
@ -92,6 +85,18 @@ class Database {
children.insert(values); children.insert(values);
} }
deleteEvent(eventKey, eventId) {
const collection = this.db.getCollection(eventKey);
if(collection)
collection.chain().find({ 'id': eventId }).remove();
}
deleteNewestBlocks(eventKey, gteBlockNum) {
const collection = this.db.getCollection(eventKey);
if(collection)
collection.chain().find({ 'blockNumber': {'$gte': gteBlockNum}}).remove();
}
} }
export default Database; export default Database;

View File

@ -10,10 +10,12 @@ class EventSyncer {
this.subscriptions = []; this.subscriptions = [];
} }
track(contractInstance, eventName, filterConditionsOrCb){ track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum){
const isFilterFunction = typeof filterConditionsOrCb === 'function'; const isFilterFunction = typeof filterConditionsOrCb === 'function';
const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {})))); const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}))));
this.db.deleteNewestBlocks(eventKey, gteBlockNum);
let filterConditions = {fromBlock: 0, toBlock: "latest"}; let filterConditions = {fromBlock: 0, toBlock: "latest"};
let filterConditionsCb; let filterConditionsCb;
if (isFilterFunction) { if (isFilterFunction) {
@ -29,17 +31,27 @@ class EventSyncer {
contractObserver.subscribe((e) => { contractObserver.subscribe((e) => {
if(!e) return; if(!e) return;
const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex});
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions // TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
const eventData = { const eventData = {
id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}), id,
returnValues: {...e.returnValues}, returnValues: {...e.returnValues},
blockNumber: e.blockNumber, blockNumber: e.blockNumber,
transactionIndex: e.transactionIndex, transactionIndex: e.transactionIndex,
logIndex: e.logIndex logIndex: e.logIndex,
removed: e.removed
} }
sub.next({blockNumber: e.blockNumber, ...e.returnValues}); // TODO: test reorgs
sub.next(eventData);
if(e.removed){
this.db.deleteEvent(eventKey, id);
return;
}
if (this.db.eventExists(eventKey, eventData.id)) return; if (this.db.eventExists(eventKey, eventData.id)) return;
@ -49,12 +61,12 @@ class EventSyncer {
}); });
const eth_subscribe = this._retrieveEvents(eventKey, const eth_subscribe = this._retrieveEvents(eventKey,
eventSummary.firstKnownBlock, eventSummary.firstKnownBlock,
eventSummary.lastKnownBlock, eventSummary.lastKnownBlock,
filterConditions, filterConditions,
filterConditionsCb, filterConditionsCb,
contractInstance, contractInstance,
eventName); eventName);
const og_subscribe = sub.subscribe; const og_subscribe = sub.subscribe;
sub.subscribe = (next, error, complete) => { sub.subscribe = (next, error, complete) => {
@ -146,7 +158,7 @@ class EventSyncer {
console.error(err); console.error(err);
return; return;
} }
if (filterConditions) { if (filterConditions) {
let propsToFilter = []; let propsToFilter = [];
for (let prop in filterConditions.filter) { for (let prop in filterConditions.filter) {

View File

@ -117,8 +117,12 @@ class LogSyncer {
_getPastEvents(filterConditions, eventKey) { _getPastEvents(filterConditions, eventKey) {
const cb = this._parseEventCBFactory(filterConditions, eventKey); const cb = this._parseEventCBFactory(filterConditions, eventKey);
this.web3.getPastLogs(options, (err, logs) => { this.web3.getPastLogs(options, (err, logs) => {
if(err) {
throw new Error(err);
}
logs.forEach(l => { logs.forEach(l => {
cb(err, l); cb(null, l);
}) })
}); });
} }
@ -131,8 +135,7 @@ class LogSyncer {
_parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => { _parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => {
if(err) { if(err) {
console.error(err); throw new Error(err);
return;
} }
if (filterConditions) { if (filterConditions) {

View File

@ -22,8 +22,10 @@ export default class Subspace {
this.web3 = new Web3Eth(provider); this.web3 = new Web3Eth(provider);
this.options = {}; this.options = {};
this.options.refreshLastNBlocks = options.refreshLastNBlocks || 12;
this.options.callInterval = options.callInterval || 0; this.options.callInterval = options.callInterval || 0;
this.options.dbFilename = options.dbFilename || 'subspace.db'; this.options.dbFilename = options.dbFilename || 'subspace.db';
this.latestBlockNumber = undefined;
this.newBlocksSubscription = null; this.newBlocksSubscription = null;
this.intervalTracker = null; this.intervalTracker = null;
@ -32,16 +34,21 @@ export default class Subspace {
init() { init() {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._db = new Database(this.options.dbFilename, this.events, resolve); this._db = new Database(this.options.dbFilename, this.events);
this.db = this._db.db; this.db = this._db.db;
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db); this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
this.logSyncer = new LogSyncer(this.web3, this.events, this._db); this.logSyncer = new LogSyncer(this.web3, this.events, this._db);
this.web3.getBlock('latest').then(block => {
this.latestBlockNumber = block.number;
resolve();
})
}) })
} }
// TODO: get contract abi/address instead // TODO: get contract abi/address instead
trackEvent(contractInstance, eventName, filterConditionsOrCb) { trackEvent(contractInstance, eventName, filterConditionsOrCb) {
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb); return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, this.latestBlockNumber - this.options.refreshLastNBlocks);
} }
clearDB(collection) { clearDB(collection) {