move event syncing to its own module; use ReplaySubject instead of Subject
This commit is contained in:
parent
49958853e5
commit
17fbaab75c
|
@ -0,0 +1,54 @@
|
||||||
|
const { Observable, fromEvent, interval, Subject, ReplaySubject } = require('rxjs');
|
||||||
|
const { throttle, throttleTime, map, distinctUntilChanged, filter, average, reduce, count, scan } = require('rxjs/operators');
|
||||||
|
|
||||||
|
class EventSyncer {
|
||||||
|
|
||||||
|
constructor(db, events) {
|
||||||
|
this.db = db;
|
||||||
|
this.events = events;
|
||||||
|
}
|
||||||
|
|
||||||
|
trackEvent(eventName, filterConditions) {
|
||||||
|
let eventKey = eventName + "-from0x123";
|
||||||
|
|
||||||
|
let tracked = this.db.getCollection('tracked')
|
||||||
|
let lastEvent = tracked.find({ "eventName": eventName })[0]
|
||||||
|
if (!lastEvent || lastEvent.length <= 0) {
|
||||||
|
tracked.insert({ "eventName": eventName, id: 0 })
|
||||||
|
lastEvent = tracked.find({ "eventName": eventName })[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
console.dir("last id was " + lastEvent.id)
|
||||||
|
|
||||||
|
let sub = new ReplaySubject();
|
||||||
|
|
||||||
|
let children = this.db.getCollection('children')
|
||||||
|
for (let previous of children.find({ 'eventKey': eventKey })) {
|
||||||
|
console.dir("checking previous event: " + previous.id)
|
||||||
|
sub.next(previous)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.dir("after")
|
||||||
|
|
||||||
|
let contractObserver = fromEvent(this.events, eventName)
|
||||||
|
contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => {
|
||||||
|
console.dir("------- syncing event");
|
||||||
|
e.eventKey = eventKey
|
||||||
|
console.dir(e);
|
||||||
|
if (children.find({ 'id': e.id }).length > 0) {
|
||||||
|
console.dir("event already synced: " + e.id)
|
||||||
|
} else {
|
||||||
|
children.insert(e)
|
||||||
|
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id))
|
||||||
|
this.events.emit("updateDB")
|
||||||
|
sub.next(e)
|
||||||
|
}
|
||||||
|
console.dir("-------");
|
||||||
|
})
|
||||||
|
|
||||||
|
return sub;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = EventSyncer;
|
51
src/index.js
51
src/index.js
|
@ -1,12 +1,12 @@
|
||||||
// var Web3 = require('web3')
|
// var Web3 = require('web3')
|
||||||
const Events = require('events')
|
const Events = require('events')
|
||||||
const events = new Events()
|
|
||||||
const Simulator = require('./simulator.js')
|
|
||||||
const simulator = new Simulator(events);
|
|
||||||
|
|
||||||
const { Observable, fromEvent, interval, Subject } = require('rxjs');
|
const { Observable, fromEvent, interval, Subject } = require('rxjs');
|
||||||
const { throttle, throttleTime, map, distinctUntilChanged, filter, average, reduce, count, scan } = require('rxjs/operators');
|
const { throttle, throttleTime, map, distinctUntilChanged, filter, average, reduce, count, scan } = require('rxjs/operators');
|
||||||
|
|
||||||
|
const Simulator = require('./simulator.js')
|
||||||
|
const EventSyncer = require('./eventSyncer.js')
|
||||||
|
|
||||||
|
|
||||||
var loki = require('lokijs')
|
var loki = require('lokijs')
|
||||||
//var db = new loki('loki.json', {autosave: true, autoload: true})
|
//var db = new loki('loki.json', {autosave: true, autoload: true})
|
||||||
|
|
||||||
|
@ -18,6 +18,10 @@ var db = new loki('phoenix.db', {
|
||||||
})
|
})
|
||||||
//db.loadDatabase()
|
//db.loadDatabase()
|
||||||
|
|
||||||
|
const events = new Events()
|
||||||
|
const simulator = new Simulator(events);
|
||||||
|
const eventSyncer = new EventSyncer(db, events);
|
||||||
|
|
||||||
function databaseInitialize() {
|
function databaseInitialize() {
|
||||||
let children = db.getCollection('children')
|
let children = db.getCollection('children')
|
||||||
if (!children) {
|
if (!children) {
|
||||||
|
@ -47,43 +51,6 @@ function run() {
|
||||||
db.saveDatabase()
|
db.saveDatabase()
|
||||||
})
|
})
|
||||||
|
|
||||||
function trackEvent(eventName, filterConditions) {
|
|
||||||
let eventKey = eventName + "-from0x123";
|
|
||||||
|
|
||||||
let lastEvent = tracked.find({ "eventName": eventName })[0]
|
|
||||||
if (!lastEvent || lastEvent.length <= 0) {
|
|
||||||
tracked.insert({ "eventName": eventName, id: 0 })
|
|
||||||
lastEvent = tracked.find({ "eventName": eventName })[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
console.dir("last id was " + lastEvent.id)
|
|
||||||
|
|
||||||
let sub = new Subject();
|
|
||||||
|
|
||||||
for (let previous of children.find({ 'eventKey': eventKey })) {
|
|
||||||
console.dir("checking previous event: " + previous.id)
|
|
||||||
sub.next(previous)
|
|
||||||
}
|
|
||||||
|
|
||||||
let contractObserver = fromEvent(events, eventName)
|
|
||||||
contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => {
|
|
||||||
console.dir("------- syncing event");
|
|
||||||
e.eventKey = eventKey
|
|
||||||
console.dir(e);
|
|
||||||
if (children.find({ 'id': e.id }).length > 0) {
|
|
||||||
console.dir("event already synced: " + e.id)
|
|
||||||
} else {
|
|
||||||
children.insert(e)
|
|
||||||
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id))
|
|
||||||
events.emit("updateDB")
|
|
||||||
sub.next(e)
|
|
||||||
}
|
|
||||||
console.dir("-------");
|
|
||||||
})
|
|
||||||
|
|
||||||
return sub;
|
|
||||||
}
|
|
||||||
|
|
||||||
let myscan = scan((acc, curr) => {
|
let myscan = scan((acc, curr) => {
|
||||||
acc.push(curr);
|
acc.push(curr);
|
||||||
if (acc.length > 4) {
|
if (acc.length > 4) {
|
||||||
|
@ -94,7 +61,7 @@ function run() {
|
||||||
|
|
||||||
let mymap = map(arr => arr.reduce((acc, current) => acc + current, 0) / arr.length)
|
let mymap = map(arr => arr.reduce((acc, current) => acc + current, 0) / arr.length)
|
||||||
|
|
||||||
trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => {
|
eventSyncer.trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => {
|
||||||
console.dir("current value is " + v)
|
console.dir("current value is " + v)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue