commit
522f0948fc
|
@ -3,6 +3,10 @@ const { throttle, filter } = require('rxjs/operators');
|
||||||
const loki = require('lokijs')
|
const loki = require('lokijs')
|
||||||
const Events = require('events')
|
const Events = require('events')
|
||||||
|
|
||||||
|
function randomString() {
|
||||||
|
return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
|
||||||
|
}
|
||||||
|
|
||||||
class EventSyncer {
|
class EventSyncer {
|
||||||
|
|
||||||
constructor(web3) {
|
constructor(web3) {
|
||||||
|
@ -45,9 +49,18 @@ class EventSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// trackEvent(eventName, filterConditions) {
|
// trackEvent(eventName, filterConditions) {
|
||||||
trackEvent(contractInstance, eventName, filterConditions) {
|
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
||||||
// let eventKey = eventName + "-from0x123";
|
// let eventKey = eventName + "-from0x123";
|
||||||
let eventKey = eventName;
|
let eventKey = eventName;
|
||||||
|
let namespace = randomString()
|
||||||
|
|
||||||
|
|
||||||
|
let filterConditions, filterConditionsCb;
|
||||||
|
if (typeof filterConditionsOrCb === 'function') {
|
||||||
|
filterConditionsCb = filterConditionsOrCb
|
||||||
|
} else {
|
||||||
|
filterConditions = filterConditionsOrCb
|
||||||
|
}
|
||||||
|
|
||||||
let tracked = this.db.getCollection('tracked')
|
let tracked = this.db.getCollection('tracked')
|
||||||
let lastEvent = tracked.find({ "eventName": eventName })[0]
|
let lastEvent = tracked.find({ "eventName": eventName })[0]
|
||||||
|
@ -66,29 +79,35 @@ class EventSyncer {
|
||||||
sub.next(previous)
|
sub.next(previous)
|
||||||
}
|
}
|
||||||
|
|
||||||
let contractObserver = fromEvent(this.events, "event-" + eventName)
|
let contractObserver = fromEvent(this.events, "event-" + eventName + "-" + namespace)
|
||||||
|
|
||||||
// TODO: this should be moved to a 'smart' module
|
// TODO: this should be moved to a 'smart' module
|
||||||
// for e.g, it should start fromBlock, from the latest known block (which means it should store block info)
|
// for e.g, it should start fromBlock, from the latest known block (which means it should store block info)
|
||||||
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
||||||
contractInstance.events[eventName].apply(contractInstance.events[eventName], [(filterConditions || {fromBlock: 0}), (err, event) => {
|
contractInstance.events[eventName].apply(contractInstance.events[eventName], [(filterConditions || {fromBlock: 0}), (err, event) => {
|
||||||
let propsToFilter = [];
|
if (filterConditions) {
|
||||||
for (let prop in filterConditions.filter) {
|
let propsToFilter = [];
|
||||||
if (Object.keys(event.returnValues).indexOf(prop) >= 0) {
|
for (let prop in filterConditions.filter) {
|
||||||
propsToFilter.push(prop)
|
if (Object.keys(event.returnValues).indexOf(prop) >= 0) {
|
||||||
|
propsToFilter.push(prop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (propsToFilter.length === 0) {
|
||||||
|
return this.events.emit("event-" + eventName + "-" + namespace, event);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (propsToFilter.length === 0) {
|
|
||||||
return this.events.emit("event-" + eventName, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (let prop of propsToFilter) {
|
for (let prop of propsToFilter) {
|
||||||
if (filterConditions.filter[prop] !== event.returnValues[prop]) {
|
if (filterConditions.filter[prop] !== event.returnValues[prop]) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (filterConditionsCb) {
|
||||||
|
if (!filterConditionsCb(event.returnValues)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.events.emit("event-" + eventName, event);
|
this.events.emit("event-" + eventName + "-" + namespace, event);
|
||||||
}])
|
}])
|
||||||
|
|
||||||
// contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => {
|
// contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => {
|
||||||
|
@ -100,12 +119,15 @@ class EventSyncer {
|
||||||
console.dir("event already synced: " + e.id)
|
console.dir("event already synced: " + e.id)
|
||||||
} else {
|
} else {
|
||||||
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
||||||
|
if (e.returnValues['$loki']) { // was already saved / synced
|
||||||
|
console.dir("already synced")
|
||||||
|
return sub.next(e.returnValues)
|
||||||
|
}
|
||||||
children.insert(e.returnValues)
|
children.insert(e.returnValues)
|
||||||
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id))
|
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id))
|
||||||
this.events.emit("updateDB")
|
this.events.emit("updateDB")
|
||||||
sub.next(e.returnValues)
|
sub.next(e.returnValues)
|
||||||
}
|
}
|
||||||
console.dir("-------");
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return sub;
|
return sub;
|
||||||
|
|
|
@ -95,8 +95,12 @@ async function run() {
|
||||||
eventSyncer.init(() => {
|
eventSyncer.init(() => {
|
||||||
|
|
||||||
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
||||||
eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => {
|
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => {
|
||||||
|
|
||||||
|
// eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => {
|
||||||
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => {
|
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => {
|
||||||
|
|
||||||
|
eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => {
|
||||||
console.dir("value is ")
|
console.dir("value is ")
|
||||||
console.dir(v)
|
console.dir(v)
|
||||||
});
|
});
|
||||||
|
@ -106,8 +110,10 @@ async function run() {
|
||||||
return acc;
|
return acc;
|
||||||
}, [])
|
}, [])
|
||||||
|
|
||||||
|
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => {
|
||||||
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), last()).subscribe((v) => {
|
// eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), last()).subscribe((v) => {
|
||||||
eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => {
|
|
||||||
|
eventSyncer.trackEvent(RatingContract, 'Rating').pipe(map(x => parseInt(x.rating)), max, distinctUntilChanged()).subscribe((v) => {
|
||||||
console.dir("max known rating is")
|
console.dir("max known rating is")
|
||||||
console.dir(v)
|
console.dir(v)
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue