clean up eventSyncer code
This commit is contained in:
parent
bd2721bf8c
commit
40fb22f280
|
@ -1,20 +1,18 @@
|
|||
const { fromEvent, interval, ReplaySubject } = require('rxjs');
|
||||
const { throttle, filter, distinctUntilChanged } = require('rxjs/operators');
|
||||
const loki = require('lokijs')
|
||||
const Events = require('events')
|
||||
|
||||
function randomString() {
|
||||
return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
|
||||
}
|
||||
const { throttle, distinctUntilChanged } = require('rxjs/operators');
|
||||
const { randomString } = require('./utils.js');
|
||||
const loki = require('lokijs');
|
||||
const Events = require('events');
|
||||
|
||||
class EventSyncer {
|
||||
|
||||
// TODO: pass provider instead of web3 object
|
||||
constructor(web3) {
|
||||
// this.events = events;
|
||||
this.events = new Events;
|
||||
this.web3 = web3;
|
||||
}
|
||||
|
||||
// TODO: pass config on where to save database
|
||||
init(cb) {
|
||||
this.db = new loki('phoenix.db', {
|
||||
autoload: true,
|
||||
|
@ -22,7 +20,7 @@ class EventSyncer {
|
|||
this.databaseInitialize(cb)
|
||||
},
|
||||
autosave: true,
|
||||
autosaveInterval: 2000 // save every four seconds for our example
|
||||
autosaveInterval: 2000
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -39,22 +37,18 @@ class EventSyncer {
|
|||
}
|
||||
|
||||
let dbChanges = fromEvent(this.events, "updateDB")
|
||||
|
||||
dbChanges.pipe(throttle(val => interval(400))).subscribe(() => {
|
||||
console.dir("saving database...")
|
||||
this.db.saveDatabase()
|
||||
})
|
||||
|
||||
cb();
|
||||
}
|
||||
|
||||
// trackEvent(eventName, filterConditions) {
|
||||
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
||||
// let eventKey = eventName + "-from0x123";
|
||||
let eventKey = eventName;
|
||||
let namespace = randomString()
|
||||
|
||||
|
||||
let filterConditions, filterConditionsCb;
|
||||
if (typeof filterConditionsOrCb === 'function') {
|
||||
filterConditionsCb = filterConditionsOrCb
|
||||
|
@ -69,13 +63,10 @@ class EventSyncer {
|
|||
lastEvent = tracked.find({ "eventName": eventName })[0]
|
||||
}
|
||||
|
||||
console.dir("last id was " + lastEvent.id)
|
||||
|
||||
let sub = new ReplaySubject();
|
||||
|
||||
let children = this.db.getCollection('children')
|
||||
for (let previous of children.find({ 'eventKey': eventKey })) {
|
||||
console.dir("checking previous event: " + previous.id)
|
||||
sub.next(previous)
|
||||
}
|
||||
|
||||
|
@ -97,9 +88,7 @@ class EventSyncer {
|
|||
}
|
||||
|
||||
for (let prop of propsToFilter) {
|
||||
if (filterConditions.filter[prop] !== event.returnValues[prop]) {
|
||||
return;
|
||||
}
|
||||
if (filterConditions.filter[prop] !== event.returnValues[prop]) return;
|
||||
}
|
||||
} else if (filterConditionsCb) {
|
||||
if (!filterConditionsCb(event.returnValues)) {
|
||||
|
@ -110,56 +99,34 @@ class EventSyncer {
|
|||
this.events.emit("event-" + eventName + "-" + namespace, event);
|
||||
}])
|
||||
|
||||
// contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => {
|
||||
contractObserver.pipe().subscribe((e) => {
|
||||
console.dir("------- syncing event");
|
||||
e.eventKey = eventKey
|
||||
// console.dir(e);
|
||||
if (children.find({ 'id': e.id }).length > 0) {
|
||||
console.dir("event already synced: " + e.id)
|
||||
} else {
|
||||
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
||||
if (e.returnValues['$loki']) { // was already saved / synced
|
||||
console.dir("already synced")
|
||||
return sub.next(e.returnValues)
|
||||
}
|
||||
contractObserver.pipe().subscribe((e) => {
|
||||
e.eventKey = eventKey
|
||||
if (children.find({ 'id': e.id }).length > 0) return;
|
||||
if (e.returnValues['$loki']) return sub.next(e.returnValues)
|
||||
|
||||
children.insert(e.returnValues)
|
||||
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id))
|
||||
this.events.emit("updateDB")
|
||||
sub.next(e.returnValues)
|
||||
}
|
||||
})
|
||||
|
||||
return sub;
|
||||
}
|
||||
|
||||
trackProperty(contractInstance, propName, filterConditionsOrCb) {
|
||||
// let eventKey = propName + "-from0x123";
|
||||
let eventKey = propName;
|
||||
let namespace = randomString()
|
||||
|
||||
let filterConditions, filterConditionsCb;
|
||||
if (typeof filterConditionsOrCb === 'function') {
|
||||
filterConditionsCb = filterConditionsOrCb
|
||||
} else {
|
||||
filterConditions = filterConditionsOrCb
|
||||
}
|
||||
|
||||
let tracked = this.db.getCollection('tracked')
|
||||
|
||||
// TODO: should save value in database
|
||||
trackProperty(contractInstance, propName, methodArgs, callArgs) {
|
||||
let sub = new ReplaySubject();
|
||||
|
||||
let children = this.db.getCollection('children')
|
||||
|
||||
// TODO: use call args from user
|
||||
let method = contractInstance.methods[propName].apply(contractInstance.methods[propName], [])
|
||||
|
||||
method.call.apply(method.call, [(filterConditions || {}), (err, result) => {
|
||||
method.call.apply(method.call, [{}, (err, result) => {
|
||||
sub.next(result)
|
||||
}])
|
||||
|
||||
var subscription = this.web3.eth.subscribe('newBlockHeaders', function (error, result) {
|
||||
method.call.apply(method.call, [(filterConditions || {}), (err, result) => {
|
||||
this.web3.eth.subscribe('newBlockHeaders', function (error, result) {
|
||||
method.call.apply(method.call, [({}), (err, result) => {
|
||||
sub.next(result)
|
||||
}])
|
||||
})
|
||||
|
@ -169,8 +136,4 @@ class EventSyncer {
|
|||
|
||||
}
|
||||
|
||||
// process.on('exit', function () {
|
||||
// db.close()
|
||||
// });
|
||||
|
||||
module.exports = EventSyncer;
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
function randomString() {
|
||||
return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
randomString
|
||||
};
|
Loading…
Reference in New Issue