From 327c8a3ec0c0fd570db154211d92fe8ca30f009f Mon Sep 17 00:00:00 2001 From: Iuri Matias Date: Tue, 20 Aug 2019 14:14:55 -0400 Subject: [PATCH] refactor eventSyner --- src/database.js | 71 +++++++++++++++++++++++++++++++++++++++ src/eventSyncer.js | 82 +++++++++++++--------------------------------- test/test1.js | 47 +++++++++++++------------- test/test2.js | 14 ++++---- test/test3.js | 26 +++++++-------- 5 files changed, 135 insertions(+), 105 deletions(-) create mode 100644 src/database.js diff --git a/src/database.js b/src/database.js new file mode 100644 index 0000000..d93b381 --- /dev/null +++ b/src/database.js @@ -0,0 +1,71 @@ +const { fromEvent, interval, ReplaySubject } = require('rxjs'); +const { throttle, distinctUntilChanged } = require('rxjs/operators'); +const loki = require('lokijs'); + +class Database { + + constructor(dbFilename, events, cb) { + this.db = new loki(dbFilename, { + autoload: true, + autoloadCallback: () => { + this.databaseInitialize(cb) + }, + autosave: true, + autosaveInterval: 2000 + }) + this.events = events; + } + + databaseInitialize(cb) { + let children = this.db.getCollection('children') + if (!children) { + children = this.db.addCollection('children') + this.db.saveDatabase() + } + let tracked = this.db.getCollection('tracked') + if (!tracked) { + tracked = this.db.addCollection('tracked') + this.db.saveDatabase() + } + + let dbChanges = fromEvent(this.events, "updateDB") + dbChanges.pipe(throttle(val => interval(400))).subscribe(() => { + this.db.saveDatabase() + }) + + cb(); + } + + getLastKnownEvent(eventName) { + let tracked = this.db.getCollection('tracked'); + let lastEvent = tracked.find({ "eventName": eventName })[0]; + if (!lastEvent || lastEvent.length <= 0) { + tracked.insert({ "eventName": eventName, id: 0 }); + lastEvent = tracked.find({ "eventName": eventName })[0]; + } + return lastEvent; + } + + updateEventId(eventName, eventId) { + let tracked = this.db.getCollection('tracked'); + tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = eventId)); + } + + getEventsFor(eventKey) { + let children = this.db.getCollection('children'); + return children.find({ 'eventKey': eventKey }); + } + + eventExists(eventId) { + let children = this.db.getCollection('children'); + return (children.find({ 'id': eventId }).length > 0); + } + + recordEvent(values) { + let children = this.db.getCollection('children'); + children.insert(values); + } + +} + +module.exports = Database; \ No newline at end of file diff --git a/src/eventSyncer.js b/src/eventSyncer.js index fdb003c..225a7f6 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -1,53 +1,29 @@ const { fromEvent, interval, ReplaySubject } = require('rxjs'); const { throttle, distinctUntilChanged } = require('rxjs/operators'); const { randomString } = require('./utils.js'); -const loki = require('lokijs'); +const Database = require('./database.js'); const Events = require('events'); +const Web3 = require('web3'); class EventSyncer { - // TODO: pass provider instead of web3 object - constructor(web3) { - this.events = new Events; - this.web3 = web3; + constructor(provider, dbFilename) { + this.events = new Events(); + this.web3 = new Web3(provider); + this.dbFilename = dbFilename || 'phoenix.db'; } - // TODO: pass config on where to save database - init(cb) { - this.db = new loki('phoenix.db', { - autoload: true, - autoloadCallback: () => { - this.databaseInitialize(cb) - }, - autosave: true, - autosaveInterval: 2000 + init() { + return new Promise((resolve, reject) => { + this._db = new Database(this.dbFilename, this.events, resolve); + this.db = this._db.db; }) } - databaseInitialize(cb) { - let children = this.db.getCollection('children') - if (!children) { - children = this.db.addCollection('children') - this.db.saveDatabase() - } - let tracked = this.db.getCollection('tracked') - if (!tracked) { - tracked = this.db.addCollection('tracked') - this.db.saveDatabase() - } - - let dbChanges = fromEvent(this.events, "updateDB") - dbChanges.pipe(throttle(val => interval(400))).subscribe(() => { - this.db.saveDatabase() - }) - - cb(); - } - + // TODO: get contract abi/address instead trackEvent(contractInstance, eventName, filterConditionsOrCb) { // let eventKey = eventName + "-from0x123"; let eventKey = eventName; - let namespace = randomString() let filterConditions, filterConditionsCb; if (typeof filterConditionsOrCb === 'function') { @@ -56,21 +32,15 @@ class EventSyncer { filterConditions = filterConditionsOrCb } - let tracked = this.db.getCollection('tracked') - let lastEvent = tracked.find({ "eventName": eventName })[0] - if (!lastEvent || lastEvent.length <= 0) { - tracked.insert({ "eventName": eventName, id: 0 }) - lastEvent = tracked.find({ "eventName": eventName })[0] - } + // TODO: should use this to resume events tracking + // let lastEvent = this._db.getLastKnownEvent(eventName) let sub = new ReplaySubject(); - let children = this.db.getCollection('children') - for (let previous of children.find({ 'eventKey': eventKey })) { - sub.next(previous) - } + this._db.getEventsFor(eventKey).forEach(sub.next); - let contractObserver = fromEvent(this.events, "event-" + eventName + "-" + namespace) + let eventbusKey = "event-" + eventName + "-" + randomString(); + let contractObserver = fromEvent(this.events, eventbusKey) // TODO: this should be moved to a 'smart' module // for e.g, it should start fromBlock, from the latest known block (which means it should store block info) @@ -83,30 +53,24 @@ class EventSyncer { propsToFilter.push(prop) } } - if (propsToFilter.length === 0) { - return this.events.emit("event-" + eventName + "-" + namespace, event); - } - for (let prop of propsToFilter) { if (filterConditions.filter[prop] !== event.returnValues[prop]) return; } - } else if (filterConditionsCb) { - if (!filterConditionsCb(event.returnValues)) { - return; - } + } else if (filterConditionsCb && !filterConditionsCb(event.returnValues)) { + return; } - this.events.emit("event-" + eventName + "-" + namespace, event); + this.events.emit(eventbusKey, event); }]) // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions contractObserver.pipe().subscribe((e) => { e.eventKey = eventKey - if (children.find({ 'id': e.id }).length > 0) return; + if (this._db.eventExists(e.id)) return; if (e.returnValues['$loki']) return sub.next(e.returnValues) - children.insert(e.returnValues) - tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) + this._db.recordEvent(e.returnValues); + this._db.updateEventId(eventName, e.id) this.events.emit("updateDB") sub.next(e.returnValues) }) @@ -125,7 +89,7 @@ class EventSyncer { sub.next(result) }]) - this.web3.eth.subscribe('newBlockHeaders', function (error, result) { + this.web3.eth.subscribe('newBlockHeaders', (error, result) => { method.call.apply(method.call, [({}), (err, result) => { sub.next(result) }]) diff --git a/test/test1.js b/test/test1.js index de7dcba..df2d106 100644 --- a/test/test1.js +++ b/test/test1.js @@ -90,36 +90,35 @@ async function run() { }) const EventSyncer = require('../src/eventSyncer.js') - const eventSyncer = new EventSyncer(web3); + const eventSyncer = new EventSyncer(web3.currentProvider); - eventSyncer.init(() => { + await eventSyncer.init() - // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions - // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { + // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions + // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { - // eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { - // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { - - eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { - console.dir("value is ") - console.dir(v) - }); - - var max = scan((acc, curr) => { - if (curr > acc) return curr; - return acc; - }, []) - - // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => { - // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), last()).subscribe((v) => { - - eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => { - console.dir("max known rating is") - console.dir(v) - }); + // eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { + // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { + eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { + console.dir("value is ") + console.dir(v) }); + var max = scan((acc, curr) => { + if (curr > acc) return curr; + return acc; + }, []) + + // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => { + // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), last()).subscribe((v) => { + + eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => { + console.dir("max known rating is") + console.dir(v) + }); + + // await RatingContract.methods.doRating(1, 5).send({from: accounts[0]}) // await RatingContract.methods.doRating(1, 3).send({from: accounts[0]}) // await RatingContract.methods.doRating(1, 1).send({from: accounts[0]}) diff --git a/test/test2.js b/test/test2.js index 872b11d..7a8299f 100644 --- a/test/test2.js +++ b/test/test2.js @@ -99,18 +99,16 @@ async function run() { }) const EventSyncer = require('../src/eventSyncer.js') - const eventSyncer = new EventSyncer(web3); + const eventSyncer = new EventSyncer(web3.currentProvider); - eventSyncer.init(() => { - console.dir("getting escrows created by " + accounts[0]) + await eventSyncer.init() + console.dir("getting escrows created by " + accounts[0]) // eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => { - eventSyncer.trackEvent(EscrowContract, 'Created', {filter: {buyer: accounts[0]}, fromBlock: 1}).pipe().subscribe((v) => { + eventSyncer.trackEvent(EscrowContract, 'Created', { filter: { buyer: accounts[0] }, fromBlock: 1 }).pipe().subscribe((v) => { // eventSyncer.trackEvent(EscrowContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { - console.dir("value is ") - console.dir(v) - }); - + console.dir("value is ") + console.dir(v) }); } diff --git a/test/test3.js b/test/test3.js index 01a887d..061bab1 100644 --- a/test/test3.js +++ b/test/test3.js @@ -171,23 +171,21 @@ async function run() { // }) const EventSyncer = require('../src/eventSyncer.js') - const eventSyncer = new EventSyncer(web3); + const eventSyncer = new EventSyncer(web3.currentProvider); - eventSyncer.init(() => { + await eventSyncer.init(); - // eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => { - // eventSyncer.trackEvent(EscrowContract, 'Created', {filter: {buyer: accounts[0]}, fromBlock: 1}).pipe().subscribe((v) => { - // eventSyncer.trackEvent(EscrowContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { - // console.dir("value is ") - // console.dir(v) - // }); + // eventSyncer.trackEvent(EscrowContract, 'Created', ((x) => true)).pipe().subscribe((v) => { + // eventSyncer.trackEvent(EscrowContract, 'Created', {filter: {buyer: accounts[0]}, fromBlock: 1}).pipe().subscribe((v) => { + // eventSyncer.trackEvent(EscrowContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { + // console.dir("value is ") + // console.dir(v) + // }); - eventSyncer.trackProperty(SimpleStorageContract, 'get', ((x) => true)).pipe().subscribe((v) => { - console.dir("value is ") - console.dir(v) - }) - - }); + eventSyncer.trackProperty(SimpleStorageContract, 'get', ((x) => true)).pipe().subscribe((v) => { + console.dir("value is ") + console.dir(v) + }) }