refactor eventSyner

This commit is contained in:
Iuri Matias 2019-08-20 14:14:55 -04:00
parent 40fb22f280
commit 327c8a3ec0
5 changed files with 135 additions and 105 deletions

71
src/database.js Normal file
View File

@ -0,0 +1,71 @@
const { fromEvent, interval, ReplaySubject } = require('rxjs');
const { throttle, distinctUntilChanged } = require('rxjs/operators');
const loki = require('lokijs');
class Database {
constructor(dbFilename, events, cb) {
this.db = new loki(dbFilename, {
autoload: true,
autoloadCallback: () => {
this.databaseInitialize(cb)
},
autosave: true,
autosaveInterval: 2000
})
this.events = events;
}
databaseInitialize(cb) {
let children = this.db.getCollection('children')
if (!children) {
children = this.db.addCollection('children')
this.db.saveDatabase()
}
let tracked = this.db.getCollection('tracked')
if (!tracked) {
tracked = this.db.addCollection('tracked')
this.db.saveDatabase()
}
let dbChanges = fromEvent(this.events, "updateDB")
dbChanges.pipe(throttle(val => interval(400))).subscribe(() => {
this.db.saveDatabase()
})
cb();
}
getLastKnownEvent(eventName) {
let tracked = this.db.getCollection('tracked');
let lastEvent = tracked.find({ "eventName": eventName })[0];
if (!lastEvent || lastEvent.length <= 0) {
tracked.insert({ "eventName": eventName, id: 0 });
lastEvent = tracked.find({ "eventName": eventName })[0];
}
return lastEvent;
}
updateEventId(eventName, eventId) {
let tracked = this.db.getCollection('tracked');
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = eventId));
}
getEventsFor(eventKey) {
let children = this.db.getCollection('children');
return children.find({ 'eventKey': eventKey });
}
eventExists(eventId) {
let children = this.db.getCollection('children');
return (children.find({ 'id': eventId }).length > 0);
}
recordEvent(values) {
let children = this.db.getCollection('children');
children.insert(values);
}
}
module.exports = Database;

View File

@ -1,53 +1,29 @@
const { fromEvent, interval, ReplaySubject } = require('rxjs'); const { fromEvent, interval, ReplaySubject } = require('rxjs');
const { throttle, distinctUntilChanged } = require('rxjs/operators'); const { throttle, distinctUntilChanged } = require('rxjs/operators');
const { randomString } = require('./utils.js'); const { randomString } = require('./utils.js');
const loki = require('lokijs'); const Database = require('./database.js');
const Events = require('events'); const Events = require('events');
const Web3 = require('web3');
class EventSyncer { class EventSyncer {
// TODO: pass provider instead of web3 object constructor(provider, dbFilename) {
constructor(web3) { this.events = new Events();
this.events = new Events; this.web3 = new Web3(provider);
this.web3 = web3; this.dbFilename = dbFilename || 'phoenix.db';
} }
// TODO: pass config on where to save database init() {
init(cb) { return new Promise((resolve, reject) => {
this.db = new loki('phoenix.db', { this._db = new Database(this.dbFilename, this.events, resolve);
autoload: true, this.db = this._db.db;
autoloadCallback: () => {
this.databaseInitialize(cb)
},
autosave: true,
autosaveInterval: 2000
}) })
} }
databaseInitialize(cb) { // TODO: get contract abi/address instead
let children = this.db.getCollection('children')
if (!children) {
children = this.db.addCollection('children')
this.db.saveDatabase()
}
let tracked = this.db.getCollection('tracked')
if (!tracked) {
tracked = this.db.addCollection('tracked')
this.db.saveDatabase()
}
let dbChanges = fromEvent(this.events, "updateDB")
dbChanges.pipe(throttle(val => interval(400))).subscribe(() => {
this.db.saveDatabase()
})
cb();
}
trackEvent(contractInstance, eventName, filterConditionsOrCb) { trackEvent(contractInstance, eventName, filterConditionsOrCb) {
// let eventKey = eventName + "-from0x123"; // let eventKey = eventName + "-from0x123";
let eventKey = eventName; let eventKey = eventName;
let namespace = randomString()
let filterConditions, filterConditionsCb; let filterConditions, filterConditionsCb;
if (typeof filterConditionsOrCb === 'function') { if (typeof filterConditionsOrCb === 'function') {
@ -56,21 +32,15 @@ class EventSyncer {
filterConditions = filterConditionsOrCb filterConditions = filterConditionsOrCb
} }
let tracked = this.db.getCollection('tracked') // TODO: should use this to resume events tracking
let lastEvent = tracked.find({ "eventName": eventName })[0] // let lastEvent = this._db.getLastKnownEvent(eventName)
if (!lastEvent || lastEvent.length <= 0) {
tracked.insert({ "eventName": eventName, id: 0 })
lastEvent = tracked.find({ "eventName": eventName })[0]
}
let sub = new ReplaySubject(); let sub = new ReplaySubject();
let children = this.db.getCollection('children') this._db.getEventsFor(eventKey).forEach(sub.next);
for (let previous of children.find({ 'eventKey': eventKey })) {
sub.next(previous)
}
let contractObserver = fromEvent(this.events, "event-" + eventName + "-" + namespace) let eventbusKey = "event-" + eventName + "-" + randomString();
let contractObserver = fromEvent(this.events, eventbusKey)
// TODO: this should be moved to a 'smart' module // TODO: this should be moved to a 'smart' module
// for e.g, it should start fromBlock, from the latest known block (which means it should store block info) // for e.g, it should start fromBlock, from the latest known block (which means it should store block info)
@ -83,30 +53,24 @@ class EventSyncer {
propsToFilter.push(prop) propsToFilter.push(prop)
} }
} }
if (propsToFilter.length === 0) {
return this.events.emit("event-" + eventName + "-" + namespace, event);
}
for (let prop of propsToFilter) { for (let prop of propsToFilter) {
if (filterConditions.filter[prop] !== event.returnValues[prop]) return; if (filterConditions.filter[prop] !== event.returnValues[prop]) return;
} }
} else if (filterConditionsCb) { } else if (filterConditionsCb && !filterConditionsCb(event.returnValues)) {
if (!filterConditionsCb(event.returnValues)) {
return; return;
} }
}
this.events.emit("event-" + eventName + "-" + namespace, event); this.events.emit(eventbusKey, event);
}]) }])
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
contractObserver.pipe().subscribe((e) => { contractObserver.pipe().subscribe((e) => {
e.eventKey = eventKey e.eventKey = eventKey
if (children.find({ 'id': e.id }).length > 0) return; if (this._db.eventExists(e.id)) return;
if (e.returnValues['$loki']) return sub.next(e.returnValues) if (e.returnValues['$loki']) return sub.next(e.returnValues)
children.insert(e.returnValues) this._db.recordEvent(e.returnValues);
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) this._db.updateEventId(eventName, e.id)
this.events.emit("updateDB") this.events.emit("updateDB")
sub.next(e.returnValues) sub.next(e.returnValues)
}) })
@ -125,7 +89,7 @@ class EventSyncer {
sub.next(result) sub.next(result)
}]) }])
this.web3.eth.subscribe('newBlockHeaders', function (error, result) { this.web3.eth.subscribe('newBlockHeaders', (error, result) => {
method.call.apply(method.call, [({}), (err, result) => { method.call.apply(method.call, [({}), (err, result) => {
sub.next(result) sub.next(result)
}]) }])

View File

@ -90,9 +90,9 @@ async function run() {
}) })
const EventSyncer = require('../src/eventSyncer.js') const EventSyncer = require('../src/eventSyncer.js')
const eventSyncer = new EventSyncer(web3); const eventSyncer = new EventSyncer(web3.currentProvider);
eventSyncer.init(() => { await eventSyncer.init()
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => {
@ -118,7 +118,6 @@ async function run() {
console.dir(v) console.dir(v)
}); });
});
// await RatingContract.methods.doRating(1, 5).send({from: accounts[0]}) // await RatingContract.methods.doRating(1, 5).send({from: accounts[0]})
// await RatingContract.methods.doRating(1, 3).send({from: accounts[0]}) // await RatingContract.methods.doRating(1, 3).send({from: accounts[0]})

View File

@ -99,9 +99,9 @@ async function run() {
}) })
const EventSyncer = require('../src/eventSyncer.js') const EventSyncer = require('../src/eventSyncer.js')
const eventSyncer = new EventSyncer(web3); const eventSyncer = new EventSyncer(web3.currentProvider);
eventSyncer.init(() => { await eventSyncer.init()
console.dir("getting escrows created by " + accounts[0]) console.dir("getting escrows created by " + accounts[0])
// eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => { // eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => {
@ -111,8 +111,6 @@ async function run() {
console.dir(v) console.dir(v)
}); });
});
} }
run() run()

View File

@ -171,9 +171,9 @@ async function run() {
// }) // })
const EventSyncer = require('../src/eventSyncer.js') const EventSyncer = require('../src/eventSyncer.js')
const eventSyncer = new EventSyncer(web3); const eventSyncer = new EventSyncer(web3.currentProvider);
eventSyncer.init(() => { await eventSyncer.init();
// eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => { // eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => {
// eventSyncer.trackEvent(EscrowContract, 'Created', {filter: {buyer: accounts[0]}, fromBlock: 1}).pipe().subscribe((v) => { // eventSyncer.trackEvent(EscrowContract, 'Created', {filter: {buyer: accounts[0]}, fromBlock: 1}).pipe().subscribe((v) => {
@ -187,8 +187,6 @@ async function run() {
console.dir(v) console.dir(v)
}) })
});
} }
run() run()