feat: survive reorgs

This commit is contained in:
Richard Ramos 2019-10-01 14:36:35 -04:00
parent 7a766ab26c
commit 11a03e8773
4 changed files with 55 additions and 28 deletions

View File

@ -33,28 +33,21 @@ class Database {
this.db = new loki(dbFilename, {
autoload: true,
autoloadCallback: () => {
this.databaseInitialize(cb)
this.databaseInitialize()
},
autosave: true,
env: getENV(),
autosaveInterval: 2000
})
});
this.events = events;
}
databaseInitialize(cb) {
let children = this.db.getCollection('children')
if (!children) {
children = this.db.addCollection('children')
this.db.saveDatabase()
}
let dbChanges = fromEvent(this.events, "updateDB")
dbChanges.subscribe(() => {
this.db.saveDatabase()
})
cb();
}
getLastKnownEvent(eventKey) {
@ -71,8 +64,8 @@ class Database {
}
return {
firstKnownBlock,
lastKnownBlock
firstKnownBlock: firstKnownBlock || 0,
lastKnownBlock: lastKnownBlock || 0
};
}
@ -92,6 +85,18 @@ class Database {
children.insert(values);
}
deleteEvent(eventKey, eventId) {
const collection = this.db.getCollection(eventKey);
if(collection)
collection.chain().find({ 'id': eventId }).remove();
}
deleteNewestBlocks(eventKey, gteBlockNum) {
const collection = this.db.getCollection(eventKey);
if(collection)
collection.chain().find({ 'blockNumber': {'$gte': gteBlockNum}}).remove();
}
}
export default Database;

View File

@ -10,10 +10,12 @@ class EventSyncer {
this.subscriptions = [];
}
track(contractInstance, eventName, filterConditionsOrCb){
track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum){
const isFilterFunction = typeof filterConditionsOrCb === 'function';
const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}))));
this.db.deleteNewestBlocks(eventKey, gteBlockNum);
let filterConditions = {fromBlock: 0, toBlock: "latest"};
let filterConditionsCb;
if (isFilterFunction) {
@ -29,17 +31,27 @@ class EventSyncer {
contractObserver.subscribe((e) => {
if(!e) return;
const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex});
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
const eventData = {
id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
id,
returnValues: {...e.returnValues},
blockNumber: e.blockNumber,
transactionIndex: e.transactionIndex,
logIndex: e.logIndex
logIndex: e.logIndex,
removed: e.removed
}
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
// TODO: test reorgs
sub.next(eventData);
if(e.removed){
this.db.deleteEvent(eventKey, id);
return;
}
if (this.db.eventExists(eventKey, eventData.id)) return;
@ -49,12 +61,12 @@ class EventSyncer {
});
const eth_subscribe = this._retrieveEvents(eventKey,
eventSummary.firstKnownBlock,
eventSummary.lastKnownBlock,
filterConditions,
filterConditionsCb,
contractInstance,
eventName);
eventSummary.firstKnownBlock,
eventSummary.lastKnownBlock,
filterConditions,
filterConditionsCb,
contractInstance,
eventName);
const og_subscribe = sub.subscribe;
sub.subscribe = (next, error, complete) => {
@ -146,7 +158,7 @@ class EventSyncer {
console.error(err);
return;
}
if (filterConditions) {
let propsToFilter = [];
for (let prop in filterConditions.filter) {

View File

@ -117,8 +117,12 @@ class LogSyncer {
_getPastEvents(filterConditions, eventKey) {
const cb = this._parseEventCBFactory(filterConditions, eventKey);
this.web3.getPastLogs(options, (err, logs) => {
if(err) {
throw new Error(err);
}
logs.forEach(l => {
cb(err, l);
cb(null, l);
})
});
}
@ -131,8 +135,7 @@ class LogSyncer {
_parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => {
if(err) {
console.error(err);
return;
throw new Error(err);
}
if (filterConditions) {

View File

@ -22,8 +22,10 @@ export default class Subspace {
this.web3 = new Web3Eth(provider);
this.options = {};
this.options.refreshLastNBlocks = options.refreshLastNBlocks || 12;
this.options.callInterval = options.callInterval || 0;
this.options.dbFilename = options.dbFilename || 'subspace.db';
this.latestBlockNumber = undefined;
this.newBlocksSubscription = null;
this.intervalTracker = null;
@ -32,16 +34,21 @@ export default class Subspace {
init() {
return new Promise((resolve, reject) => {
this._db = new Database(this.options.dbFilename, this.events, resolve);
this._db = new Database(this.options.dbFilename, this.events);
this.db = this._db.db;
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
this.logSyncer = new LogSyncer(this.web3, this.events, this._db);
this.web3.getBlock('latest').then(block => {
this.latestBlockNumber = block.number;
resolve();
})
})
}
// TODO: get contract abi/address instead
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, this.latestBlockNumber - this.options.refreshLastNBlocks);
}
clearDB(collection) {