diff --git a/src/eventSyncer.js b/src/eventSyncer.js index d303f28..fdb003c 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -1,20 +1,18 @@ const { fromEvent, interval, ReplaySubject } = require('rxjs'); -const { throttle, filter, distinctUntilChanged } = require('rxjs/operators'); -const loki = require('lokijs') -const Events = require('events') - -function randomString() { - return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); -} +const { throttle, distinctUntilChanged } = require('rxjs/operators'); +const { randomString } = require('./utils.js'); +const loki = require('lokijs'); +const Events = require('events'); class EventSyncer { + // TODO: pass provider instead of web3 object constructor(web3) { - // this.events = events; this.events = new Events; this.web3 = web3; } + // TODO: pass config on where to save database init(cb) { this.db = new loki('phoenix.db', { autoload: true, @@ -22,7 +20,7 @@ class EventSyncer { this.databaseInitialize(cb) }, autosave: true, - autosaveInterval: 2000 // save every four seconds for our example + autosaveInterval: 2000 }) } @@ -39,22 +37,18 @@ class EventSyncer { } let dbChanges = fromEvent(this.events, "updateDB") - dbChanges.pipe(throttle(val => interval(400))).subscribe(() => { - console.dir("saving database...") this.db.saveDatabase() }) cb(); } - // trackEvent(eventName, filterConditions) { trackEvent(contractInstance, eventName, filterConditionsOrCb) { // let eventKey = eventName + "-from0x123"; let eventKey = eventName; let namespace = randomString() - let filterConditions, filterConditionsCb; if (typeof filterConditionsOrCb === 'function') { filterConditionsCb = filterConditionsOrCb @@ -69,13 +63,10 @@ class EventSyncer { lastEvent = tracked.find({ "eventName": eventName })[0] } - console.dir("last id was " + lastEvent.id) - let sub = new ReplaySubject(); let children = this.db.getCollection('children') for (let previous of children.find({ 'eventKey': eventKey })) { - console.dir("checking previous event: " + previous.id) sub.next(previous) } @@ -97,9 +88,7 @@ class EventSyncer { } for (let prop of propsToFilter) { - if (filterConditions.filter[prop] !== event.returnValues[prop]) { - return; - } + if (filterConditions.filter[prop] !== event.returnValues[prop]) return; } } else if (filterConditionsCb) { if (!filterConditionsCb(event.returnValues)) { @@ -110,56 +99,34 @@ class EventSyncer { this.events.emit("event-" + eventName + "-" + namespace, event); }]) - // contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => { + // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions contractObserver.pipe().subscribe((e) => { - console.dir("------- syncing event"); e.eventKey = eventKey - // console.dir(e); - if (children.find({ 'id': e.id }).length > 0) { - console.dir("event already synced: " + e.id) - } else { - // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions - if (e.returnValues['$loki']) { // was already saved / synced - console.dir("already synced") - return sub.next(e.returnValues) - } - children.insert(e.returnValues) - tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) - this.events.emit("updateDB") - sub.next(e.returnValues) - } + if (children.find({ 'id': e.id }).length > 0) return; + if (e.returnValues['$loki']) return sub.next(e.returnValues) + + children.insert(e.returnValues) + tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) + this.events.emit("updateDB") + sub.next(e.returnValues) }) return sub; } - trackProperty(contractInstance, propName, filterConditionsOrCb) { - // let eventKey = propName + "-from0x123"; - let eventKey = propName; - let namespace = randomString() - - let filterConditions, filterConditionsCb; - if (typeof filterConditionsOrCb === 'function') { - filterConditionsCb = filterConditionsOrCb - } else { - filterConditions = filterConditionsOrCb - } - - let tracked = this.db.getCollection('tracked') - + // TODO: should save value in database + trackProperty(contractInstance, propName, methodArgs, callArgs) { let sub = new ReplaySubject(); - let children = this.db.getCollection('children') - // TODO: use call args from user let method = contractInstance.methods[propName].apply(contractInstance.methods[propName], []) - method.call.apply(method.call, [(filterConditions || {}), (err, result) => { + method.call.apply(method.call, [{}, (err, result) => { sub.next(result) }]) - var subscription = this.web3.eth.subscribe('newBlockHeaders', function (error, result) { - method.call.apply(method.call, [(filterConditions || {}), (err, result) => { + this.web3.eth.subscribe('newBlockHeaders', function (error, result) { + method.call.apply(method.call, [({}), (err, result) => { sub.next(result) }]) }) @@ -169,8 +136,4 @@ class EventSyncer { } -// process.on('exit', function () { -// db.close() -// }); - module.exports = EventSyncer; \ No newline at end of file diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 0000000..d7af058 --- /dev/null +++ b/src/utils.js @@ -0,0 +1,8 @@ + +function randomString() { + return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); +} + +module.exports = { + randomString +};