feat: store event data and retrieve from db (#29)

* feat: store event data and retrieve from db
* refactor: event tracking
This commit is contained in:
Richard Ramos 2019-09-08 21:35:05 -04:00 committed by GitHub
parent bbcada804e
commit 2a213f053b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 3144 additions and 258 deletions

View File

@ -22,8 +22,9 @@
"@babel/core": "^7.1.6",
"@babel/plugin-proposal-class-properties": "^7.5.5",
"@babel/preset-env": "^7.1.6",
"babel-loader": "^8.0.4",
"add-module-exports-webpack-plugin": "^1.0.0",
"babel-loader": "^8.0.4",
"ganache-core": "^2.7.0",
"cross-env": "^5.2.0",
"npm-run-all": "^4.1.5",
"rimraf": "^2.6.2",
@ -36,6 +37,7 @@
"dependencies": {
"fast-deep-equal": "^2.0.1",
"lokijs": "^1.5.6",
"object-hash": "^1.3.1",
"rxjs": "^6.5.2",
"web3-eth": "^1.2.1"
},

View File

@ -1,5 +1,4 @@
import { fromEvent, interval, ReplaySubject } from 'rxjs';
import { throttle, distinctUntilChanged } from 'rxjs/operators';
import { fromEvent } from 'rxjs';
import loki from 'lokijs';
const getENV = function () {
@ -49,47 +48,47 @@ class Database {
children = this.db.addCollection('children')
this.db.saveDatabase()
}
let tracked = this.db.getCollection('tracked')
if (!tracked) {
tracked = this.db.addCollection('tracked')
this.db.saveDatabase()
}
let dbChanges = fromEvent(this.events, "updateDB")
dbChanges.pipe(throttle(val => interval(400))).subscribe(() => {
dbChanges.subscribe(() => {
this.db.saveDatabase()
})
cb();
}
getLastKnownEvent(eventName) {
let tracked = this.db.getCollection('tracked');
let lastEvent = tracked.find({ "eventName": eventName })[0];
if (!lastEvent || lastEvent.length <= 0) {
tracked.insert({ "eventName": eventName, id: 0 });
lastEvent = tracked.find({ "eventName": eventName })[0];
getLastKnownEvent(eventKey) {
const collection = this.db.getCollection(eventKey);
let firstKnownBlock = 0;
let lastKnownBlock = 0;
if(collection && collection.count()){
firstKnownBlock = collection.min('blockNumber');
lastKnownBlock = collection.max('blockNumber');
} else {
this.db.addCollection(eventKey);
}
return lastEvent;
return {
firstKnownBlock,
lastKnownBlock
};
}
updateEventId(eventName, eventId) {
let tracked = this.db.getCollection('tracked');
tracked.updateWhere(((x) => x.eventName === eventName), ((x) => x.id = eventId));
}
getEventsFor(eventKey) {
let children = this.db.getCollection('children');
return children.find({ 'eventKey': eventKey });
let children = this.db.getCollection(eventKey);
return children.find();
}
eventExists(eventId) {
let children = this.db.getCollection('children');
eventExists(eventKey, eventId) {
let children = this.db.getCollection(eventKey);
return (children.find({ 'id': eventId }).length > 0);
}
recordEvent(values) {
let children = this.db.getCollection('children');
recordEvent(eventKey, values) {
let children = this.db.getCollection(eventKey);
children.insert(values);
}

View File

@ -1,8 +1,7 @@
import { fromEvent, interval, ReplaySubject } from 'rxjs';
import { throttle, distinctUntilChanged } from 'rxjs/operators';
import { randomString } from './utils.js';
import { fromEvent, ReplaySubject } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
import equal from 'fast-deep-equal';
import hash from 'object-hash';
import Database from './database.js';
import Events from 'events';
import Web3Eth from 'web3-eth';
@ -33,62 +32,150 @@ export default class EventSyncer {
// TODO: get contract abi/address instead
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
// let eventKey = eventName + "-from0x123";
let eventKey = eventName;
let eventKey = eventName + '-' + hash(filterConditionsOrCb);
let filterConditions, filterConditionsCb;
let filterConditions = {fromBlock: 0, toBlock: "latest"};
let filterConditionsCb;
if (typeof filterConditionsOrCb === 'function') {
filterConditionsCb = filterConditionsOrCb
filterConditionsCb = filterConditionsOrCb;
} else {
filterConditions = filterConditionsOrCb
filterConditions = Object.assign(filterConditions, filterConditionsOrCb || {});
}
// TODO: should use this to resume events tracking
// let lastEvent = this._db.getLastKnownEvent(eventName)
let eventSummary = this._db.getLastKnownEvent(eventKey);
let sub = new ReplaySubject();
let contractObserver = fromEvent(this.events, eventKey)
this._db.getEventsFor(eventKey).forEach(sub.next);
contractObserver.subscribe((e) => {
if(!e) return;
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
let eventbusKey = "event-" + eventName + "-" + randomString();
let contractObserver = fromEvent(this.events, eventbusKey)
// TODO: this should be moved to a 'smart' module
// for e.g, it should start fromBlock, from the latest known block (which means it should store block info)
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
contractInstance.events[eventName].apply(contractInstance.events[eventName], [(filterConditions || {fromBlock: 0}), (err, event) => {
if (filterConditions) {
let propsToFilter = [];
for (let prop in filterConditions.filter) {
if (Object.keys(event.returnValues).indexOf(prop) >= 0) {
propsToFilter.push(prop)
}
}
for (let prop of propsToFilter) {
if (filterConditions.filter[prop] !== event.returnValues[prop]) return;
}
} else if (filterConditionsCb && !filterConditionsCb(event.returnValues)) {
return;
const eventData = {
id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
returnValues: {...e.returnValues},
blockNumber: e.blockNumber,
transactionIndex: e.transactionIndex,
logIndex: e.logIndex
}
this.events.emit(eventbusKey, event);
}])
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
contractObserver.pipe().subscribe((e) => {
e.eventKey = eventKey
if (this._db.eventExists(e.id)) return;
if (e.returnValues['$loki']) return sub.next(e.returnValues)
if (this._db.eventExists(eventKey, eventData.id)) return;
this._db.recordEvent(e.returnValues);
this._db.updateEventId(eventName, e.id)
this.events.emit("updateDB")
sub.next(e.returnValues)
})
this._db.recordEvent(eventKey, eventData);
this.events.emit("updateDB");
});
this._retrieveEvents(eventKey,
eventSummary.firstKnownBlock,
eventSummary.lastKnownBlock,
filterConditions,
filterConditionsCb,
contractInstance,
eventName);
return sub;
}
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
// TODO: this should be moved to a 'smart' module
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
// TODO: filter subscriptions with fromBlock and toBlock
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
if (filterConditions.toBlock === 'latest') {
// emit DB Events [fromBlock, lastKnownBlock]
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
// create a event subscription [lastKnownBlock + 1, ...]
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
}
else if (filterConditions.toBlock <= lastKnownBlock) {
// emit DB Events [fromBlock, toBlock]
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions, filterConditionsCb);
}
else {
// emit DB Events [fromBlock, lastKnownBlock]
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
// create a past event subscription [lastKnownBlock + 1, toBlock]
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
}
}
else if (firstKnownBlock > 0) {
// create a past event subscription [ firstKnownBlock > fromBlock ? fromBlock : 0, firstKnownBlock - 1]
let fromBlock = firstKnownBlock > filterConditions.fromBlock ? filterConditions.fromBlock : 0;
let filters = Object.assign({}, filterConditions, { fromBlock, toBlock: firstKnownBlock - 1 });
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
if (filterConditions.toBlock === 'latest') {
// emit DB Events [firstKnownBlock, lastKnownBlock]
this._serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb);
// create a subscription [lastKnownBlock + 1, ...]
const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 });
this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
}
else if (filterConditions.toBlock <= lastKnownBlock) {
// emit DB Events [fromBlock, toBlock]
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions, filterConditionsCb);
}
else {
// emit DB Events [fromBlock, lastKnownBlock]
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
// create a past event subscription [lastKnownBlock + 1, toBlock]
let filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1, toBlock: filterConditions.toBlock });
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
}
}
}
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) {
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
const storedEvents = this._db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
storedEvents.forEach(ev => {
cb(null, ev);
});
}
_getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) {
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => {
events.forEach(ev => {
cb(err, ev);
});
} ]);
}
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
}
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
if(err) return;
if (filterConditions) {
let propsToFilter = [];
for (let prop in filterConditions.filter) {
if (Object.keys(ev.returnValues).indexOf(prop) >= 0) {
propsToFilter.push(prop);
}
}
for (let prop of propsToFilter) {
if (filterConditions.filter[prop] !== ev.returnValues[prop])
return;
}
}
else if (filterConditionsCb && !filterConditionsCb(ev.returnValues)) {
return;
}
this.events.emit(eventKey, ev);
}
_initNewBlocksSubscription() {
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
@ -115,7 +202,7 @@ export default class EventSyncer {
}
// TODO: should save value in database
// TODO: should save value in database?
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
const sub = new ReplaySubject();
@ -141,7 +228,7 @@ export default class EventSyncer {
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
}
// TODO: should save value in database (?)
// TODO: should save value in database?
trackBalance(address, erc20Address) {
const sub = new ReplaySubject();
@ -151,6 +238,7 @@ export default class EventSyncer {
if(!erc20Address){
callFn = () => {
const fn = this.web3.getBalance;
fn.apply(fn, [address, (err, balance) => {
if(err) {
sub.error(err);
@ -191,5 +279,4 @@ export default class EventSyncer {
this.callables = [];
}
}
}

View File

@ -1,23 +1,24 @@
const ganache = require("ganache-core");
const Web3Eth = require('web3-eth');
let eth = new Web3Eth("ws://localhost:8545");
console.log("The following error is emitted by ganache - https://github.com/trufflesuite/ganache-core/issues/267")
let eth = new Web3Eth(ganache.provider());
async function run() {
let accounts = await eth.getAccounts();
setTimeout(async () => {
await eth.sendTransaction({from: accounts[0], to: accounts[1], value: "100000000"});
await eth.sendTransaction({from: accounts[2], to: accounts[0], value: "999999999"});
await eth.sendTransaction({from: accounts[2], to: accounts[0], value: "232433434"});
}, 3000);
}, 2000);
const EventSyncer = require('../dist/node.js');
const eventSyncer = new EventSyncer(eth.currentProvider);
await eventSyncer.init();
eventSyncer.trackBalance(accounts[0]).pipe().subscribe((balance) => {
eventSyncer.trackBalance(accounts[0]).subscribe((balance) => {
console.log("accounts[0] balance is ", balance);
})

75
test/test6.js Normal file
View File

@ -0,0 +1,75 @@
const Web3Eth = require('web3-eth');
const {deployRatingContract, mine} = require('./utils-web3');
const ganache = require("ganache-core");
console.log("The following error is emitted by ganache - https://github.com/trufflesuite/ganache-core/issues/267")
let eth = new Web3Eth(ganache.provider());
async function run() {
let accounts = await eth.getAccounts();
var RatingContract = await deployRatingContract(eth)
// Events are generated in these blocks:
// x x x x x x x x x
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20
await mine(eth);
await RatingContract.methods.doRating(1, 5).send({from: accounts[0]})
await mine(eth);
await mine(eth);
await RatingContract.methods.doRating(2, 3).send({from: accounts[0]})
await RatingContract.methods.doRating(3, 1).send({from: accounts[0]})
await RatingContract.methods.doRating(4, 5).send({from: accounts[0]})
await RatingContract.methods.doRating(5, 5).send({from: accounts[0]})
await mine(eth);
await RatingContract.methods.doRating(6, 5).send({from: accounts[0]})
await mine(eth);
await mine(eth);
await RatingContract.methods.doRating(7, 5).send({from: accounts[0]})
await RatingContract.methods.doRating(8, 5).send({from: accounts[0]})
await mine(eth);
await mine(eth);
await mine(eth);
await mine(eth);
await RatingContract.methods.doRating(8, 5).send({from: accounts[0]})
const EventSyncer = require('../dist/node.js');
const eventSyncer = new EventSyncer(eth.currentProvider);
await eventSyncer.init()
// Testing single block with a event
eventSyncer.trackEvent(RatingContract, 'Rating', {fromBlock: 3, toBlock: 3}).subscribe((v) => {
console.log("A", v)
});
// Testing blocks that have no events in between
eventSyncer.trackEvent(RatingContract, 'Rating', {fromBlock: 8, toBlock: 11}).subscribe((v) => {
console.log("B", v)
});
// Testing blocks that begin with no events
eventSyncer.trackEvent(RatingContract, 'Rating', {fromBlock: 12, toBlock: 15}).subscribe((v) => {
console.log("C", v)
});
// Testing all blocks
eventSyncer.trackEvent(RatingContract, 'Rating', {}).subscribe((v) => {
console.log("D", v)
});
// Testing blocks that end in no events
eventSyncer.trackEvent(RatingContract, 'Rating', {fromBlock: 14, toBlock: 18}).subscribe((v) => {
console.log("E", v)
});
setTimeout(() => {
// Testing if events come from the DB instead of a subscription
eventSyncer.trackEvent(RatingContract, 'Rating', {fromBlock: 7, toBlock: 11}).subscribe((v) => {
console.log("E", v)
});
}, 5000);
}
run()

View File

@ -118,7 +118,8 @@ async function deployRatingContract(eth) {
}
]
var contract = new eth.Contract(abi)
var contract = new eth.Contract(abi);
let instance = await contract.deploy({
data: '0x608060405234801561001057600080fd5b5060e78061001f6000396000f3fe6080604052600436106039576000357c010000000000000000000000000000000000000000000000000000000090048063f60781a914603e575b600080fd5b348015604957600080fd5b50607d60048036036040811015605e57600080fd5b810190808035906020019092919080359060200190929190505050607f565b005b817ffdefdf8d82459f7b1eb157e5c44cbe6ee73d8ddd387511fe3622a3ee663b4697826040518082815260200191505060405180910390a2505056fea165627a7a7230582067833697a0e2bccb8bd624c0b06b2183641addb24f7931d8ec3979982bb663790029',
arguments: []
@ -128,9 +129,24 @@ async function deployRatingContract(eth) {
})
return instance
}
const mine = (web3) => {
return new Promise((resolve, reject) => {
web3.currentProvider.send({
jsonrpc: '2.0',
method: 'evm_mine',
id: new Date().getTime()
}, (err, result) => {
if (err) { return reject(err) }
return resolve(result)
})
})
}
module.exports = {
deployEscrowContract,
deployRatingContract
deployRatingContract,
mine
};

3062
yarn.lock

File diff suppressed because it is too large Load Diff