From 3e8a6a244df13b1d78ec32fe16c3e986946b5faf Mon Sep 17 00:00:00 2001 From: Iuri Matias Date: Fri, 9 Aug 2019 16:03:15 -0400 Subject: [PATCH] trackEvent working with real contracts & events --- src/eventSyncer.js | 32 ++++++++++++++++++++++++-------- src/index.js | 12 ++++++++++++ test/test1.js | 32 ++++++++++++++++++++++++++++++-- 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/eventSyncer.js b/src/eventSyncer.js index 569644b..f25ee4f 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -1,11 +1,14 @@ const { fromEvent, interval, ReplaySubject } = require('rxjs'); const { throttle, filter } = require('rxjs/operators'); const loki = require('lokijs') +const Events = require('events') class EventSyncer { - constructor(events) { - this.events = events; + constructor(web3) { + // this.events = events; + this.events = new Events; + this.web3 = web3; } init(cb) { @@ -41,7 +44,8 @@ class EventSyncer { cb(); } - trackEvent(eventName, filterConditions) { + // trackEvent(eventName, filterConditions) { + trackEvent(contractInstance, eventName, filterConditions) { // let eventKey = eventName + "-from0x123"; let eventKey = eventName; @@ -62,18 +66,30 @@ class EventSyncer { sub.next(previous) } - let contractObserver = fromEvent(this.events, eventName) - contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => { + let contractObserver = fromEvent(this.events, "event-" + eventName) + + // TODO: this should be moved to a 'smart' module + // for e.g, it should start fromBlock, from the latest known block (which means it should store block info) + // it should be able to do events X at the time to avoid slow downs as well as the 10k limit + contractInstance.events[eventName].apply(contractInstance.events[eventName], [{fromBlock: 0}, (err, event) => { + // let eventObject = event.returnValues; + // eventObject.id = event.id; + this.events.emit("event-" + eventName, event); + }]) + + // contractObserver.pipe(filter((x) => x.id > lastEvent.id)).pipe(filter(filterConditions)).subscribe((e) => { + contractObserver.pipe().subscribe((e) => { console.dir("------- syncing event"); e.eventKey = eventKey - console.dir(e); + // console.dir(e); if (children.find({ 'id': e.id }).length > 0) { console.dir("event already synced: " + e.id) } else { - children.insert(e) + // TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions + children.insert(e.returnValues) tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = e.id)) this.events.emit("updateDB") - sub.next(e) + sub.next(e.returnValues) } console.dir("-------"); }) diff --git a/src/index.js b/src/index.js index e69de29..1cac4a8 100644 --- a/src/index.js +++ b/src/index.js @@ -0,0 +1,12 @@ +const EventSyncer = require('./eventSyncer.js') + +const eventSyncer = new EventSyncer(web3); + +eventSyncer.init(() => {}); + + // eventSyncer.trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => { + // console.dir("current average is " + v) + // }) + + +// return Event diff --git a/test/test1.js b/test/test1.js index 49b7ab6..bb96c87 100644 --- a/test/test1.js +++ b/test/test1.js @@ -1,7 +1,18 @@ +const { map, scan } = require('rxjs/operators'); const Web3 = require('web3'); let web3 = new Web3("ws://localhost:8545"); +let myscan = scan((acc, curr) => { + acc.push(curr); + if (acc.length > 4) { + acc.shift(); + } + return acc; +}, []) + +let mymap = map(arr => arr.reduce((acc, current) => acc + current, 0) / arr.length) + async function deployContract() { let accounts = await web3.eth.getAccounts(); @@ -74,10 +85,27 @@ async function run() { // RatingContract.events.getPastEvents('Rating', {fromBlock: 1}) RatingContract.events.Rating({fromBlock: 1}, (err, event) => { - console.dir("new event") - console.dir(event) + // console.dir("new event") + // console.dir(event) }) + const EventSyncer = require('../src/eventSyncer.js') + const eventSyncer = new EventSyncer(web3); + + eventSyncer.init(() => { + + eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => parseInt(x.rating)), myscan, mymap).subscribe((v) => { + // eventSyncer.trackEvent(RatingContract, 'Rating', ((x) => true)).pipe(map(x => x.rating)).subscribe((v) => { + console.dir("value is ") + console.dir(v) + }); + + }); + + // await RatingContract.methods.doRating(1, 5).send({from: accounts[0]}) + // await RatingContract.methods.doRating(1, 3).send({from: accounts[0]}) + // await RatingContract.methods.doRating(1, 1).send({from: accounts[0]}) + // await RatingContract.methods.doRating(1, 5).send({from: accounts[0]}) } run() \ No newline at end of file