diff --git a/src/eventSyncer.js b/src/eventSyncer.js new file mode 100644 index 0000000..9da0b1c --- /dev/null +++ b/src/eventSyncer.js @@ -0,0 +1,54 @@ +const { Observable, fromEvent, interval, Subject, ReplaySubject } = require('rxjs'); +const { throttle, throttleTime, map, distinctUntilChanged, filter, average, reduce, count, scan } = require('rxjs/operators'); + +class EventSyncer { + + constructor(db, events) { + this.db = db; + this.events = events; + } + + trackEvent(eventName, filterConditions) { + let eventKey = eventName + "-from0x123"; + + let tracked = this.db.getCollection('tracked') + let lastEvent = tracked.find({ "eventName": eventName })[0] + if (!lastEvent || lastEvent.length <= 0) { + tracked.insert({ "eventName": eventName, id: 0 }) + lastEvent = tracked.find({ "eventName": eventName })[0] + } + + console.dir("last id was " + lastEvent.id) + + let sub = new ReplaySubject(); + + let children = this.db.getCollection('children') + for (let previous of children.find({ 'eventKey': eventKey })) { + console.dir("checking previous event: " + previous.id) + sub.next(previous) + } + + console.dir("after") + + let contractObserver = fromEvent(this.events, eventName) + contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => { + console.dir("------- syncing event"); + e.eventKey = eventKey + console.dir(e); + if (children.find({ 'id': e.id }).length > 0) { + console.dir("event already synced: " + e.id) + } else { + children.insert(e) + tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) + this.events.emit("updateDB") + sub.next(e) + } + console.dir("-------"); + }) + + return sub; + } + +} + +module.exports = EventSyncer; \ No newline at end of file diff --git a/src/index.js b/src/index.js index 974ecde..db460a8 100644 --- a/src/index.js +++ b/src/index.js @@ -1,12 +1,12 @@ // var Web3 = require('web3') const Events = require('events') -const events = new Events() -const Simulator = require('./simulator.js') -const simulator = new Simulator(events); - const { Observable, fromEvent, interval, Subject } = require('rxjs'); const { throttle, throttleTime, map, distinctUntilChanged, filter, average, reduce, count, scan } = require('rxjs/operators'); +const Simulator = require('./simulator.js') +const EventSyncer = require('./eventSyncer.js') + + var loki = require('lokijs') //var db = new loki('loki.json', {autosave: true, autoload: true}) @@ -18,6 +18,10 @@ var db = new loki('phoenix.db', { }) //db.loadDatabase() +const events = new Events() +const simulator = new Simulator(events); +const eventSyncer = new EventSyncer(db, events); + function databaseInitialize() { let children = db.getCollection('children') if (!children) { @@ -47,43 +51,6 @@ function run() { db.saveDatabase() }) - function trackEvent(eventName, filterConditions) { - let eventKey = eventName + "-from0x123"; - - let lastEvent = tracked.find({ "eventName": eventName })[0] - if (!lastEvent || lastEvent.length <= 0) { - tracked.insert({ "eventName": eventName, id: 0 }) - lastEvent = tracked.find({ "eventName": eventName })[0] - } - - console.dir("last id was " + lastEvent.id) - - let sub = new Subject(); - - for (let previous of children.find({ 'eventKey': eventKey })) { - console.dir("checking previous event: " + previous.id) - sub.next(previous) - } - - let contractObserver = fromEvent(events, eventName) - contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => { - console.dir("------- syncing event"); - e.eventKey = eventKey - console.dir(e); - if (children.find({ 'id': e.id }).length > 0) { - console.dir("event already synced: " + e.id) - } else { - children.insert(e) - tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) - events.emit("updateDB") - sub.next(e) - } - console.dir("-------"); - }) - - return sub; - } - let myscan = scan((acc, curr) => { acc.push(curr); if (acc.length > 4) { @@ -94,7 +61,7 @@ function run() { let mymap = map(arr => arr.reduce((acc, current) => acc + current, 0) / arr.length) - trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => { + eventSyncer.trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => { console.dir("current value is " + v) })