feat: observable events (#73)

* feat: use observables instead of subjects for events
* Combine ws and http event syncer in a single file and complete observables
* Remove event scanners and simplify db serve function
This commit is contained in:
Richard Ramos 2020-03-04 14:18:14 -04:00 committed by GitHub
parent 48164fc0f0
commit 97c1131fc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 183 additions and 236 deletions

View File

@ -41,8 +41,17 @@ class Database {
}
databaseInitialize(cb) {
let dbChanges = fromEvent(this.events, "updateDB");
dbChanges.subscribe(() => {
// TODO: save in the db results by using a combination of concatMap and scan, to save results in batches
fromEvent(this.events, "updateDB").subscribe(({eventKey, eventData}) => {
if (eventData.removed) {
this.deleteEvent(eventKey, eventData.id);
return;
}
if (this.eventExists(eventKey, eventData.id)) return;
this.recordEvent(eventKey, eventData);
this.db.saveDatabase();
});
}

102
src/eventScanner.js Normal file
View File

@ -0,0 +1,102 @@
import {sleep} from "./utils";
class EventScanner {
constructor(web3, isWebsocketProvider) {
this.pollExecution = [];
this.subscriptions = [];
this.web3 = web3;
this.isWebsocketProvider = isWebsocketProvider;
}
async poll(execId, fn, timeout) {
const shouldStop = await fn();
if (!this.pollExecution[execId] || shouldStop) return;
if (timeout) await sleep(timeout * 1000);
await this.poll(execId, fn, timeout);
}
async scan(serveDBEvents, getPastEvents, subscribe, lastCachedBlock, filterConditions) {
const execId = this.pollExecution.push(true) - 1;
const maxBlockRange = 500000; // TODO: extract to config
const lastBlockNumberAtLoad = await this.web3.getBlockNumber();
// If there's a toBlock with a number
let toBlockFilter = 0;
if (filterConditions.toBlock && filterConditions.toBlock !== "latest") {
toBlockFilter = filterConditions.toBlock;
}
const toBlockInPast = toBlockFilter && toBlockFilter < lastBlockNumberAtLoad;
// Events from the DB
let shouldStop = serveDBEvents(toBlockFilter, lastCachedBlock);
if(shouldStop) return;
// Get old events and store them in db
lastCachedBlock = Math.max(lastCachedBlock + 1, filterConditions.fromBlock || 0);
await this.poll(execId, async () => {
try {
const maxBlock = Math.min(lastCachedBlock + maxBlockRange, lastBlockNumberAtLoad);
const toBlock = toBlockInPast ? Math.min(maxBlock, toBlockFilter) : maxBlock;
const toBlockLimit = Math.min(await this.web3.getBlockNumber(), toBlock);
if (toBlockLimit >= lastCachedBlock) {
shouldStop = await getPastEvents(lastCachedBlock, toBlockLimit, toBlockInPast ? toBlockFilter : null);
lastCachedBlock = toBlockLimit + 1;
}
} catch (e) {
console.log(e.toString());
}
// Should exit?
return (
shouldStop ||
(toBlockInPast && lastCachedBlock >= toBlockFilter) ||
lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockInPast ? toBlockFilter || 0 : 0)
);
});
if(shouldStop) return;
if (toBlockInPast) return;
// Subscriptions
if(this.isWebsocketProvider){
let filters = Object.assign({}, filterConditions, {
fromBlock: lastCachedBlock,
toBlock: "latest" // TODO: use a proper toBlock depending if the toBlock is in the future and !== "latest"
});
return subscribe(this.subscriptions, filters);
} else {
// Get new data, with a timeout between requests
await this.poll(
execId,
async () => {
try {
let toBlockLimit = await this.web3.getBlockNumber();
if (toBlockLimit >= lastCachedBlock) {
await getPastEvents(lastCachedBlock, toBlockLimit, toBlockFilter);
lastCachedBlock = toBlockLimit + 1;
}
} catch (e) {
console.log(e.toString());
}
// Should exit?
return (
filterConditions.toBlock !== "latest" && lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockFilter)
);
},
1
);
}
}
close() {
this.pollExecution = Array(this.pollExecution.length).fill(false);
this.subscriptions.forEach(x => x.unsubscribe());
}
}
export default EventScanner;

View File

@ -1,15 +1,15 @@
import {fromEvent, ReplaySubject} from "rxjs";
import {Observable} from "rxjs";
import hash from "object-hash";
import HttpEventScanner from "./httpEventScanner";
import WsEventScanner from "./wsEventScanner";
import EventScanner from "./eventScanner";
class EventSyncer {
constructor(web3, events, db, isWebsocketProvider) {
this.events = events;
this.web3 = web3;
this.db = db;
this.isWebsocketProvider = isWebsocketProvider;
this.eventScanner = isWebsocketProvider ? new WsEventScanner(web3) : new HttpEventScanner(web3);
this.eventScanner = new EventScanner(web3, isWebsocketProvider);
}
track(contractInstance, eventName, filters, gteBlockNum, networkId) {
@ -19,96 +19,70 @@ class EventSyncer {
let filterConditions = Object.assign({fromBlock: 0, toBlock: "latest"}, filters ?? {});
let lastKnownBlock = this.db.getLastKnownEvent(eventKey);
let firstKnownBlock = this.db.getFirstKnownEvent(eventKey);
let sub = new ReplaySubject();
let contractObserver = fromEvent(this.events, eventKey);
const observable = new Observable(subscriber => {
const cb = this.callbackFactory(subscriber, filters, eventKey, eventName);
const fnDBEvents = this.serveDBEvents(cb, eventKey);
const fnPastEvents = this.getPastEvents(cb, eventKey, contractInstance, eventName, filters);
const fnSubscribe = this.isWebsocketProvider ? this.subscribeToEvent(cb, contractInstance, eventName) : null;
contractObserver.subscribe(e => {
if (!e) {
sub.complete();
return;
}
const id = hash({
eventName,
blockNumber: e.blockNumber,
transactionIndex: e.transactionIndex,
logIndex: e.logIndex
});
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
const eventData = {
id,
returnValues: {...e.returnValues},
blockNumber: e.blockNumber,
transactionIndex: e.transactionIndex,
logIndex: e.logIndex,
removed: e.removed
};
// TODO: test reorgs
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
if (e.removed) {
this.db.deleteEvent(eventKey, id);
return;
}
if (this.db.eventExists(eventKey, eventData.id)) return;
this.db.recordEvent(eventKey, eventData);
this.events.emit("updateDB");
});
const fnDBEvents = this.serveDBEvents(eventKey);
const fnPastEvents = this.getPastEvents(eventKey, contractInstance, eventName, filters);
if (this.isWebsocketProvider) {
const fnSubscribe = this.subscribeToEvent(eventKey, contractInstance, eventName);
const ethSubscription = this.eventScanner.scan(
fnDBEvents,
fnPastEvents,
fnSubscribe,
firstKnownBlock,
lastKnownBlock,
filterConditions
);
return [sub, ethSubscription];
} else {
this.eventScanner.scan(fnDBEvents, fnPastEvents, lastKnownBlock, filterConditions);
return [sub, undefined];
}
return () => {
if (ethSubscription) {
ethSubscription.then(s => {
if (s) {
s.unsubscribe();
}
});
}
};
});
return observable;
}
getPastEvents = (eventKey, contractInstance, eventName, filters) => async (fromBlock, toBlock, hardLimit) => {
getPastEvents = (cb, eventKey, contractInstance, eventName, filters) => async (fromBlock, toBlock, hardLimit) => {
let events = await contractInstance.getPastEvents(eventName, {...filters, fromBlock, toBlock});
const cb = this.callbackFactory(filters, eventKey);
events.forEach(ev => cb(null, ev));
if (hardLimit && toBlock === hardLimit) {
cb(null, null, true); // Complete observable
return true;
}
return false;
};
serveDBEvents = (cb, eventKey) => (toBlock, lastCachedBlock) => {
const events = this.db.getEventsFor(eventKey);
events.forEach(ev => cb(null, ev));
if (hardLimit && toBlock === hardLimit) {
// Complete the observable
this.events.emit(eventKey);
if(toBlock > 0 && lastCachedBlock >= toBlock) {
cb(null, null, true); // Complete observable
return true;
}
return false;
};
serveDBEvents = eventKey => (filters, toBlock, fromBlock = null) => {
const cb = this.callbackFactory(filters, eventKey);
this.db.getEventsFor(eventKey)
.filter(x => x.blockNumber >= (fromBlock || filters.fromBlock) && x.blockNumber <= toBlock)
.forEach(ev => cb(null, ev));
};
subscribeToEvent = (eventKey, contractInstance, eventName) => (subscriptions, filters) => {
const cb = this.callbackFactory(filters, eventKey);
const s = contractInstance.events[eventName](filters, cb);
subscribeToEvent = (cb, contractInstance, eventName) => (subscriptions, filters) => {
const s = contractInstance.events[eventName](filters, (err, event) => cb(err, event));
subscriptions.push(s);
// TODO: Complete observable if necessary
return s;
};
callbackFactory = (filterConditions, eventKey) => (err, ev) => {
callbackFactory = (subscriber, filterConditions, eventKey, eventName) => (err, event, complete = false) => {
if (complete) {
subscriber.complete();
return;
}
if (err) {
console.error(err);
return;
@ -117,16 +91,32 @@ class EventSyncer {
if (filterConditions) {
let propsToFilter = [];
for (let prop in filterConditions.filter) {
if (Object.keys(ev.returnValues).indexOf(prop) >= 0) {
if (Object.keys(event.returnValues).indexOf(prop) >= 0) {
propsToFilter.push(prop);
}
}
for (let prop of propsToFilter) {
if (filterConditions.filter[prop] !== ev.returnValues[prop]) return;
if (filterConditions.filter[prop] !== event.returnValues[prop]) return;
}
}
this.events.emit(eventKey, ev);
const eventData = {
id: hash({
eventName,
blockNumber: event.blockNumber,
transactionIndex: event.transactionIndex,
logIndex: event.logIndex
}),
returnValues: {...event.returnValues},
blockNumber: event.blockNumber,
transactionIndex: event.transactionIndex,
logIndex: event.logIndex,
removed: event.removed
};
subscriber.next(eventData.returnValues);
this.events.emit("updateDB", {eventKey, eventData});
};
close() {

View File

@ -1,89 +0,0 @@
import {sleep} from "./utils";
class HttpEventScanner {
constructor(web3) {
this.pollExecution = [];
this.web3 = web3;
}
async poll(execId, fn, timeout) {
const shouldStop = await fn();
if (!this.pollExecution[execId] || shouldStop) return;
if (timeout) await sleep(timeout * 1000);
await this.poll(execId, fn, timeout);
}
async scan(serveDBEvents, getPastEvents, lastCachedBlock, filterConditions) {
const execId = this.pollExecution.push(true) - 1;
const maxBlockRange = 500000; // TODO: extract to config
const lastBlockNumberAtLoad = await this.web3.getBlockNumber();
// If there's a toBlock with a number
let toBlockFilter = 0;
if (filterConditions.toBlock && filterConditions.toBlock !== "latest") {
toBlockFilter = filterConditions.toBlock;
}
const toBlockInPast = toBlockFilter && toBlockFilter < lastBlockNumberAtLoad;
// Determine if data already exists and return it.
let dbLimit = toBlockFilter > 0 ? Math.min(toBlockFilter, lastCachedBlock) : lastCachedBlock;
if (lastCachedBlock > 0 && filterConditions.fromBlock >= 0) {
serveDBEvents(filterConditions, dbLimit);
lastCachedBlock = lastCachedBlock + 1;
}
lastCachedBlock = Math.max(lastCachedBlock, filterConditions.fromBlock || 0);
// Get old events and store them in db
await this.poll(execId, async () => {
try {
const maxBlock = Math.min(lastCachedBlock + maxBlockRange, lastBlockNumberAtLoad);
const toBlock = toBlockInPast ? Math.min(maxBlock, toBlockFilter) : maxBlock;
const toBlockLimit = Math.min(await this.web3.getBlockNumber(), toBlock);
if (toBlockLimit >= lastCachedBlock) {
await getPastEvents(lastCachedBlock, toBlockLimit, toBlockInPast ? toBlockFilter : null);
lastCachedBlock = toBlockLimit + 1;
}
} catch (e) {
console.log(e.toString());
}
// Should exit?
return (
(toBlockInPast && lastCachedBlock >= (toBlockFilter || 0)) ||
lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockInPast ? toBlockFilter || 0 : 0)
);
});
if (toBlockInPast) return;
// Get new data, with a timeout between requests
await this.poll(
execId,
async () => {
try {
let toBlockLimit = await this.web3.getBlockNumber();
if (toBlockLimit >= lastCachedBlock) {
await getPastEvents(lastCachedBlock, toBlockLimit, toBlockFilter || 0);
lastCachedBlock = toBlockLimit + 1;
}
} catch (e) {
console.log(e.toString());
}
// Should exit?
return (
filterConditions.toBlock !== "latest" && lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockFilter || 0)
);
},
1
);
}
close() {
this.pollExecution = Array(this.pollExecution.length).fill(false);
}
}
export default HttpEventScanner;

View File

@ -209,13 +209,13 @@ export default class Subspace {
return this._getObservable(subjectHash, () => {
const deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks;
const [subject, ethSubscription] = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId);
const observable = this.eventSyncer
.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId)
.pipe(shareReplay({refCount: true}));
subject.map = mapFunc(subject);
observable.map = mapFunc(observable);
// TODO: remove eth subscription
return subject;
return observable;
});
}

View File

@ -1,65 +0,0 @@
class WsEventScanner {
constructor(web3) {
this.web3 = web3;
this.subscriptions = [];
}
async scan(serveDBEvents, getPastEvents, subscribe, firstKnownBlock, lastKnownBlock, filterConditions) {
const lastBlockNumberAtLoad = await this.web3.getBlockNumber();
// If there's a toBlock with a number
let toBlockFilter = 0;
if (filterConditions.toBlock && filterConditions.toBlock !== "latest") {
toBlockFilter = filterConditions.toBlock;
}
const toBlockInPast = toBlockFilter && toBlockFilter < lastBlockNumberAtLoad;
const hardLimit = toBlockInPast ? toBlockFilter : null;
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
if (filterConditions.toBlock === "latest") {
// emit DB Events [fromBlock, lastKnownBlock]
serveDBEvents(filterConditions, lastKnownBlock);
// create a event subscription [lastKnownBlock + 1, ...]
let filters = Object.assign({}, filterConditions, {
fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1
});
return subscribe(this.subscriptions, filters);
} else if (filterConditions.toBlock <= lastKnownBlock) {
// emit DB Events [fromBlock, toBlock]
serveDBEvents(filterConditions, filterConditions.toBlock);
} else {
// emit DB Events [fromBlock, lastKnownBlock]
serveDBEvents(filterConditions, lastKnownBlock);
// get past events [lastKnownBlock + 1, toBlock]
const fromBlock = filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1;
await getPastEvents(fromBlock, filterConditions.toBlock, hardLimit);
}
} else if (firstKnownBlock > 0) {
// get past events [ firstKnownBlock > fromBlock ? fromBlock : 0, firstKnownBlock - 1]
const fromBlock = firstKnownBlock > filterConditions.fromBlock ? filterConditions.fromBlock : 0;
await getPastEvents(fromBlock, firstKnownBlock - 1, hardLimit);
if (filterConditions.toBlock === "latest") {
// emit DB Events [firstKnownBlock, lastKnownBlock]
serveDBEvents(filterConditions, lastKnownBlock, firstKnownBlock);
// create a subscription [lastKnownBlock + 1, ...]
const filters = Object.assign({}, filterConditions, {fromBlock: lastKnownBlock + 1});
return subscribe(this.subscriptions, filters);
} else if (filterConditions.toBlock <= lastKnownBlock) {
// emit DB Events [fromBlock, toBlock]
serveDBEvents(filterConditions, filterConditions.toBlock);
} else {
// emit DB Events [fromBlock, lastKnownBlock]
serveDBEvents(filterConditions, lastKnownBlock);
// get past events [lastKnownBlock + 1, toBlock]
await getPastEvents(lastKnownBlock + 1, filterConditions.toBlock, hardLimit);
}
}
}
close() {
this.subscriptions.forEach(x => x.unsubscribe());
}
}
export default WsEventScanner;