feat: support both http and ws providers (#61)
* feat: using polling instead of subscriptions for getting events * calling data from db * feat: http event syncer WIP * support both http and ws providers
This commit is contained in:
parent
a9f10e04a0
commit
dc65a68868
6
.babelrc
6
.babelrc
|
@ -1,4 +1,4 @@
|
|||
{
|
||||
"plugins": ["@babel/plugin-proposal-class-properties"],
|
||||
"presets": ["@babel/preset-env"],
|
||||
}
|
||||
"plugins": ["@babel/transform-runtime", "@babel/plugin-proposal-class-properties"],
|
||||
"presets": ["@babel/preset-env"]
|
||||
}
|
|
@ -41,6 +41,8 @@
|
|||
"@babel/cli": "^7.1.5",
|
||||
"@babel/core": "^7.1.6",
|
||||
"@babel/plugin-proposal-class-properties": "^7.5.5",
|
||||
"@babel/plugin-transform-runtime": "^7.6.2",
|
||||
"@babel/runtime": "^7.7.2",
|
||||
"@babel/preset-env": "^7.1.6",
|
||||
"add-module-exports-webpack-plugin": "^1.0.0",
|
||||
"babel-loader": "^8.0.4",
|
||||
|
|
|
@ -50,21 +50,22 @@ class Database {
|
|||
|
||||
getLastKnownEvent(eventKey) {
|
||||
const collection = this.db.getCollection(eventKey);
|
||||
|
||||
let firstKnownBlock = 0;
|
||||
let lastKnownBlock = 0;
|
||||
|
||||
if (collection && collection.count()){
|
||||
firstKnownBlock = collection.min('blockNumber');
|
||||
lastKnownBlock = collection.max('blockNumber');
|
||||
return collection.max('blockNumber');
|
||||
} else {
|
||||
this.db.addCollection(eventKey);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
return {
|
||||
firstKnownBlock: firstKnownBlock || 0,
|
||||
lastKnownBlock: lastKnownBlock || 0
|
||||
};
|
||||
getFirstKnownEvent(eventKey) {
|
||||
const collection = this.db.getCollection(eventKey);
|
||||
if (collection && collection.count()){
|
||||
return collection.min('blockNumber');
|
||||
} else {
|
||||
this.db.addCollection(eventKey);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
getEventsFor(eventKey) {
|
||||
|
@ -94,6 +95,8 @@ class Database {
|
|||
}
|
||||
|
||||
deleteNewestBlocks(eventKey, gteBlockNum) {
|
||||
if(gteBlockNum <= 0) return;
|
||||
|
||||
const collection = this.db.getCollection(eventKey);
|
||||
if(collection)
|
||||
collection.chain().find({ 'blockNumber': {'$gte': gteBlockNum}}).remove();
|
||||
|
|
|
@ -1,36 +1,36 @@
|
|||
import { fromEvent, ReplaySubject } from 'rxjs';
|
||||
import hash from 'object-hash';
|
||||
import HttpEventScanner from './httpEventScanner';
|
||||
import WsEventScanner from './wsEventScanner';
|
||||
|
||||
class EventSyncer {
|
||||
|
||||
constructor(web3, events, db) {
|
||||
constructor(web3, events, db, isWebsocketProvider) {
|
||||
this.events = events;
|
||||
this.web3 = web3;
|
||||
this.db = db;
|
||||
this.subscriptions = [];
|
||||
this.isWebsocketProvider = isWebsocketProvider;
|
||||
this.eventScanner = isWebsocketProvider ? new WsEventScanner(web3) : new HttpEventScanner(web3);
|
||||
}
|
||||
|
||||
track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum, networkId) {
|
||||
const isFilterFunction = typeof filterConditionsOrCb === 'function';
|
||||
const eventKey = hash(Object.assign({address: contractInstance.options.address, networkId}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}))));
|
||||
track(contractInstance, eventName, filters, gteBlockNum, networkId) {
|
||||
const eventKey = hash(Object.assign({address: contractInstance.options.address, networkId}, (filters || {})));
|
||||
|
||||
this.db.deleteNewestBlocks(eventKey, gteBlockNum);
|
||||
|
||||
let filterConditions = {fromBlock: 0, toBlock: "latest"};
|
||||
let filterConditionsCb;
|
||||
if (isFilterFunction) {
|
||||
filterConditionsCb = filterConditionsOrCb;
|
||||
} else {
|
||||
filterConditions = Object.assign(filterConditions, filterConditionsOrCb || {});
|
||||
}
|
||||
let filterConditions = Object.assign({fromBlock: 0, toBlock: "latest"}, filters || {});
|
||||
let lastKnownBlock = this.db.getLastKnownEvent(eventKey);
|
||||
let firstKnownBlock = this.db.getFirstKnownEvent(eventKey);
|
||||
|
||||
let eventSummary = this.db.getLastKnownEvent(eventKey);
|
||||
|
||||
let sub = new ReplaySubject();
|
||||
let contractObserver = fromEvent(this.events, eventKey)
|
||||
|
||||
contractObserver.subscribe((e) => {
|
||||
if (!e) return;
|
||||
if (!e) {
|
||||
sub.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex});
|
||||
|
||||
|
@ -45,7 +45,6 @@ class EventSyncer {
|
|||
}
|
||||
|
||||
// TODO: test reorgs
|
||||
|
||||
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
|
||||
|
||||
if (e.removed){
|
||||
|
@ -60,100 +59,60 @@ class EventSyncer {
|
|||
this.events.emit("updateDB");
|
||||
});
|
||||
|
||||
const eth_subscribe = this._retrieveEvents(eventKey,
|
||||
eventSummary.firstKnownBlock,
|
||||
eventSummary.lastKnownBlock,
|
||||
filterConditions,
|
||||
filterConditionsCb,
|
||||
contractInstance,
|
||||
eventName);
|
||||
|
||||
const og_subscribe = sub.subscribe;
|
||||
sub.subscribe = (next, error, complete) => {
|
||||
const s = og_subscribe.apply(sub, [next, error, complete]);
|
||||
s.add(() => { // Removing web3js subscription when rxJS unsubscribe is executed
|
||||
if(eth_subscribe) eth_subscribe.unsubscribe();
|
||||
});
|
||||
return s;
|
||||
const fnDBEvents = this.serveDBEvents(eventKey);
|
||||
const fnPastEvents = this.getPastEvents(eventKey, contractInstance, eventName, filters);
|
||||
|
||||
if(this.isWebsocketProvider){
|
||||
const fnSubscribe = this.subscribeToEvent(eventKey, contractInstance, eventName);
|
||||
const eth_subscribe = this.eventScanner.scan(fnDBEvents, fnPastEvents, fnSubscribe, firstKnownBlock, lastKnownBlock, filterConditions);
|
||||
|
||||
const og_subscribe = sub.subscribe;
|
||||
sub.subscribe = async (next, error, complete) => {
|
||||
const s = og_subscribe.apply(sub, [next, error, complete]);
|
||||
s.add(() => { // Removing web3js subscription when rxJS unsubscribe is executed
|
||||
eth_subscribe.then(susc => {
|
||||
if(susc) {
|
||||
susc.unsubscribe();
|
||||
}
|
||||
});
|
||||
});
|
||||
return s;
|
||||
}
|
||||
} else {
|
||||
this.eventScanner.scan(fnDBEvents, fnPastEvents, lastKnownBlock, filterConditions);
|
||||
}
|
||||
|
||||
return sub;
|
||||
}
|
||||
|
||||
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
|
||||
// TODO: this should be moved to a 'smart' module
|
||||
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
||||
getPastEvents = (eventKey, contractInstance, eventName, filters) => async (fromBlock, toBlock, hardLimit) => {
|
||||
let events = await contractInstance.getPastEvents(eventName, { ...filters, fromBlock, toBlock });
|
||||
const cb = this.callbackFactory(filters, eventKey);
|
||||
|
||||
events.forEach(ev => cb(null, ev));
|
||||
|
||||
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
||||
if (filterConditions.toBlock === 'latest') {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a event subscription [lastKnownBlock + 1, ...]
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||
return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions, filterConditionsCb);
|
||||
}
|
||||
else {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a past event subscription [lastKnownBlock + 1, toBlock]
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
}
|
||||
else if (firstKnownBlock > 0) {
|
||||
// create a past event subscription [ firstKnownBlock > fromBlock ? fromBlock : 0, firstKnownBlock - 1]
|
||||
let fromBlock = firstKnownBlock > filterConditions.fromBlock ? filterConditions.fromBlock : 0;
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock, toBlock: firstKnownBlock - 1 });
|
||||
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
|
||||
if (filterConditions.toBlock === 'latest') {
|
||||
// emit DB Events [firstKnownBlock, lastKnownBlock]
|
||||
this._serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a subscription [lastKnownBlock + 1, ...]
|
||||
const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 });
|
||||
return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions, filterConditionsCb);
|
||||
}
|
||||
else {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a past event subscription [lastKnownBlock + 1, toBlock]
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1, toBlock: filterConditions.toBlock });
|
||||
this._getPastEvents(contractInstance, eventName, filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
if(hardLimit && toBlock === hardLimit){ // Complete the observable
|
||||
this.events.emit(eventKey);
|
||||
}
|
||||
}
|
||||
|
||||
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) {
|
||||
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
||||
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
|
||||
serveDBEvents = eventKey => (filters, toBlock, fromBlock = null) => {
|
||||
const cb = this.callbackFactory(filters, eventKey);
|
||||
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= (fromBlock || filters.fromBlock) && x.blockNumber <= toBlock);
|
||||
storedEvents.forEach(ev => {
|
||||
cb(null, ev);
|
||||
});
|
||||
}
|
||||
|
||||
_getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) {
|
||||
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
||||
contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => {
|
||||
events.forEach(ev => {
|
||||
cb(err, ev);
|
||||
});
|
||||
}]);
|
||||
}
|
||||
|
||||
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
|
||||
const s = event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
|
||||
this.subscriptions.push(s);
|
||||
subscribeToEvent = (eventKey, contractInstance, eventName) => (subscriptions, filters) => {
|
||||
const cb = this.callbackFactory(filters, eventKey);
|
||||
const s = contractInstance.events[eventName](filters, cb);
|
||||
subscriptions.push(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
||||
callbackFactory = (filterConditions, eventKey) => (err, ev) => {
|
||||
if (err) {
|
||||
console.error(err);
|
||||
return;
|
||||
|
@ -170,16 +129,12 @@ class EventSyncer {
|
|||
if (filterConditions.filter[prop] !== ev.returnValues[prop]) return;
|
||||
}
|
||||
}
|
||||
else if (filterConditionsCb && !filterConditionsCb(ev.returnValues)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.events.emit(eventKey, ev);
|
||||
}
|
||||
|
||||
close(){
|
||||
this.subscriptions.forEach(x => {
|
||||
x.unsubscribe();
|
||||
})
|
||||
this.eventScanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
import {sleep} from './utils'
|
||||
|
||||
class HttpEventScanner {
|
||||
constructor(web3){
|
||||
this.pollExecution = [];
|
||||
this.web3 = web3;
|
||||
}
|
||||
|
||||
async poll(execId, fn, timeout){
|
||||
const shouldStop = await fn();
|
||||
if(!this.pollExecution[execId] || shouldStop) return;
|
||||
if(timeout) await sleep(timeout * 1000);
|
||||
await this.poll(execId, fn, timeout);
|
||||
}
|
||||
|
||||
async scan(serveDBEvents, getPastEvents, lastCachedBlock, filterConditions) {
|
||||
const execId = this.pollExecution.push(true) - 1;
|
||||
const maxBlockRange = 500000; // TODO: extract to config
|
||||
const lastBlockNumberAtLoad = await this.web3.getBlockNumber();
|
||||
|
||||
// If there's a toBlock with a number
|
||||
let toBlockFilter = 0;
|
||||
if(filterConditions.toBlock && filterConditions.toBlock !== 'latest' ){
|
||||
toBlockFilter = filterConditions.toBlock;
|
||||
}
|
||||
const toBlockInPast = toBlockFilter && toBlockFilter < lastBlockNumberAtLoad;
|
||||
|
||||
// Determine if data already exists and return it.
|
||||
let dbLimit = toBlockFilter > 0 ? Math.min(toBlockFilter, lastCachedBlock) : lastCachedBlock;
|
||||
if(lastCachedBlock > 0 && filterConditions.fromBlock >= 0){
|
||||
serveDBEvents(filterConditions, dbLimit);
|
||||
lastCachedBlock = lastCachedBlock + 1;
|
||||
}
|
||||
|
||||
lastCachedBlock = Math.max(lastCachedBlock, filterConditions.fromBlock||0);
|
||||
|
||||
// Get old events and store them in db
|
||||
await this.poll(execId, async () => {
|
||||
try {
|
||||
const maxBlock = Math.min(lastCachedBlock + maxBlockRange, lastBlockNumberAtLoad);
|
||||
const toBlock = toBlockInPast ? Math.min(maxBlock, toBlockFilter) : maxBlock;
|
||||
const toBlockLimit = Math.min(await this.web3.getBlockNumber(), toBlock);
|
||||
|
||||
if(toBlockLimit >= lastCachedBlock) {
|
||||
await getPastEvents(lastCachedBlock, toBlockLimit, toBlockInPast ? toBlockFilter : null);
|
||||
lastCachedBlock = toBlockLimit + 1;
|
||||
}
|
||||
} catch (e) {
|
||||
console.log(e.toString());
|
||||
}
|
||||
|
||||
// Should exit?
|
||||
return (toBlockInPast && lastCachedBlock >= (toBlockFilter || 0)) || (lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockInPast ? toBlockFilter || 0 : 0));
|
||||
});
|
||||
|
||||
if(toBlockInPast) return;
|
||||
|
||||
// Get new data, with a timeout between requests
|
||||
await this.poll(execId, async () => {
|
||||
try {
|
||||
let toBlockLimit = await this.web3.getBlockNumber()
|
||||
if(toBlockLimit >= lastCachedBlock) {
|
||||
await getPastEvents(lastCachedBlock, toBlockLimit, toBlockFilter || 0);
|
||||
lastCachedBlock = toBlockLimit + 1;
|
||||
}
|
||||
} catch (e) {
|
||||
console.log(e.toString());
|
||||
}
|
||||
|
||||
// Should exit?
|
||||
return filterConditions.toBlock !== 'latest' && lastCachedBlock > Math.max(lastBlockNumberAtLoad, toBlockFilter || 0);
|
||||
}, 1);
|
||||
|
||||
}
|
||||
|
||||
close(){
|
||||
this.pollExecution = Array(this.pollExecution.length).fill(false);
|
||||
}
|
||||
}
|
||||
|
||||
export default HttpEventScanner;
|
|
@ -16,6 +16,7 @@ export default class Subspace {
|
|||
constructor(provider, options = {}) {
|
||||
if (provider.constructor.name !== "WebsocketProvider") {
|
||||
console.warn("subspace: it's recommended to use a websocket provider to react to new events");
|
||||
console.warn("If this provider supports websockets, use {useWebsockets: true} in subspace options");
|
||||
}
|
||||
|
||||
this.events = new Events();
|
||||
|
@ -28,7 +29,8 @@ export default class Subspace {
|
|||
this.latestBlockNumber = undefined;
|
||||
this.disableDatabase = options.disableDatabase;
|
||||
this.networkId = undefined;
|
||||
|
||||
this.isWebsocketProvider = provider.constructor.name === "WebsocketProvider" || options.useWebsockets
|
||||
|
||||
this.newBlocksSubscription = null;
|
||||
this.intervalTracker = null;
|
||||
this.callables = [];
|
||||
|
@ -41,7 +43,7 @@ export default class Subspace {
|
|||
} else {
|
||||
this._db = new Database(this.options.dbFilename, this.events);
|
||||
}
|
||||
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
||||
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db, this.isWebsocketProvider);
|
||||
this.logSyncer = new LogSyncer(this.web3, this.events, this._db);
|
||||
|
||||
this.web3.net.getId().then(netId => {
|
||||
|
@ -51,8 +53,12 @@ export default class Subspace {
|
|||
this.web3.getBlock('latest').then(block => {
|
||||
this.latestBlockNumber = block.number;
|
||||
|
||||
this._initNewBlocksSubscription();
|
||||
this._initCallInterval();
|
||||
if(this.isWebsocketProvider){
|
||||
this._initNewBlocksSubscription();
|
||||
} else {
|
||||
this.options.callInterval = this.options.callInterval || 1000;
|
||||
this._initCallInterval();
|
||||
}
|
||||
|
||||
resolve();
|
||||
})
|
||||
|
@ -114,7 +120,8 @@ export default class Subspace {
|
|||
|
||||
// TODO: get contract abi/address instead
|
||||
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
||||
let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId);
|
||||
let deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks;
|
||||
let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb, deleteFrom, this.networkId);
|
||||
|
||||
returnSub.map = (prop) => {
|
||||
return returnSub.pipe(map((x) => {
|
||||
|
@ -143,6 +150,7 @@ export default class Subspace {
|
|||
}
|
||||
|
||||
trackLogs(options, inputsABI) {
|
||||
if(!this.isWebsocketProvider) console.warn("This method only works with websockets");
|
||||
return this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId);
|
||||
}
|
||||
|
||||
|
@ -151,7 +159,7 @@ export default class Subspace {
|
|||
|
||||
this.newBlocksSubscription = this.web3.subscribe('newBlockHeaders', (err, result) => {
|
||||
if (err) {
|
||||
sub.error(err);
|
||||
console.error(err);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -6,3 +6,8 @@ export function randomString() {
|
|||
export function isAddress(address) {
|
||||
return /^(0x)?[0-9a-fA-F]{40}$/i.test(address)
|
||||
};
|
||||
|
||||
|
||||
export function sleep(milliseconds) {
|
||||
return new Promise(resolve => setTimeout(resolve, milliseconds));
|
||||
};
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
class WsEventScanner {
|
||||
constructor(web3) {
|
||||
this.web3 = web3;
|
||||
this.subscriptions = [];
|
||||
}
|
||||
|
||||
async scan(serveDBEvents, getPastEvents, subscribe, firstKnownBlock, lastKnownBlock, filterConditions) {
|
||||
const lastBlockNumberAtLoad = await this.web3.getBlockNumber();
|
||||
|
||||
// If there's a toBlock with a number
|
||||
let toBlockFilter = 0;
|
||||
if(filterConditions.toBlock && filterConditions.toBlock !== 'latest' ){
|
||||
toBlockFilter = filterConditions.toBlock;
|
||||
}
|
||||
const toBlockInPast = toBlockFilter && toBlockFilter < lastBlockNumberAtLoad;
|
||||
const hardLimit = toBlockInPast ? toBlockFilter : null;
|
||||
|
||||
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
||||
if (filterConditions.toBlock === 'latest') {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
serveDBEvents(filterConditions, lastKnownBlock);
|
||||
// create a event subscription [lastKnownBlock + 1, ...]
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||
return subscribe(this.subscriptions, filters);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
serveDBEvents(filterConditions, filterConditions.toBlock);
|
||||
}
|
||||
else {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
serveDBEvents(filterConditions, lastKnownBlock);
|
||||
// get past events [lastKnownBlock + 1, toBlock]
|
||||
const fromBlock = filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1;
|
||||
await getPastEvents(fromBlock, filterConditions.toBlock, hardLimit);
|
||||
}
|
||||
}
|
||||
else if (firstKnownBlock > 0) {
|
||||
// get past events [ firstKnownBlock > fromBlock ? fromBlock : 0, firstKnownBlock - 1]
|
||||
const fromBlock = firstKnownBlock > filterConditions.fromBlock ? filterConditions.fromBlock : 0;
|
||||
await getPastEvents(fromBlock, firstKnownBlock - 1, hardLimit);
|
||||
|
||||
if (filterConditions.toBlock === 'latest') {
|
||||
// emit DB Events [firstKnownBlock, lastKnownBlock]
|
||||
serveDBEvents(filterConditions, lastKnownBlock, firstKnownBlock);
|
||||
// create a subscription [lastKnownBlock + 1, ...]
|
||||
const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 });
|
||||
return subscribe(this.subscriptions, filters);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
serveDBEvents(filterConditions, filterConditions.toBlock);
|
||||
}
|
||||
else {
|
||||
// emit DB Events [fromBlock, lastKnownBlock]
|
||||
serveDBEvents(filterConditions, lastKnownBlock);
|
||||
// get past events [lastKnownBlock + 1, toBlock]
|
||||
await getPastEvents(lastKnownBlock + 1, filterConditions.toBlock, hardLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(){
|
||||
this.subscriptions.forEach(x => x.unsubscribe());
|
||||
}
|
||||
}
|
||||
|
||||
export default WsEventScanner;
|
26
yarn.lock
26
yarn.lock
|
@ -539,6 +539,16 @@
|
|||
dependencies:
|
||||
"@babel/helper-plugin-utils" "^7.0.0"
|
||||
|
||||
"@babel/plugin-transform-runtime@^7.6.2":
|
||||
version "7.6.2"
|
||||
resolved "https://registry.yarnpkg.com/@babel/plugin-transform-runtime/-/plugin-transform-runtime-7.6.2.tgz#2669f67c1fae0ae8d8bf696e4263ad52cb98b6f8"
|
||||
integrity sha512-cqULw/QB4yl73cS5Y0TZlQSjDvNkzDbu0FurTZyHlJpWE5T3PCMdnyV+xXoH1opr1ldyHODe3QAX3OMAii5NxA==
|
||||
dependencies:
|
||||
"@babel/helper-module-imports" "^7.0.0"
|
||||
"@babel/helper-plugin-utils" "^7.0.0"
|
||||
resolve "^1.8.1"
|
||||
semver "^5.5.1"
|
||||
|
||||
"@babel/plugin-transform-shorthand-properties@^7.2.0":
|
||||
version "7.2.0"
|
||||
resolved "https://registry.yarnpkg.com/@babel/plugin-transform-shorthand-properties/-/plugin-transform-shorthand-properties-7.2.0.tgz#6333aee2f8d6ee7e28615457298934a3b46198f0"
|
||||
|
@ -641,6 +651,13 @@
|
|||
js-levenshtein "^1.1.3"
|
||||
semver "^5.5.0"
|
||||
|
||||
"@babel/runtime@^7.7.2":
|
||||
version "7.7.2"
|
||||
resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.7.2.tgz#111a78002a5c25fc8e3361bedc9529c696b85a6a"
|
||||
integrity sha512-JONRbXbTXc9WQE2mAZd1p0Z3DZ/6vaQIkgYMSTP3KjRCyd7rCZCcfhCyX+YjwcKxcZ82UrxbRD358bpExNgrjw==
|
||||
dependencies:
|
||||
regenerator-runtime "^0.13.2"
|
||||
|
||||
"@babel/template@^7.1.0", "@babel/template@^7.4.4", "@babel/template@^7.6.0":
|
||||
version "7.6.0"
|
||||
resolved "https://registry.yarnpkg.com/@babel/template/-/template-7.6.0.tgz#7f0159c7f5012230dad64cca42ec9bdb5c9536e6"
|
||||
|
@ -6068,6 +6085,11 @@ regenerator-runtime@^0.11.0:
|
|||
resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz#be05ad7f9bf7d22e056f9726cee5017fbf19e2e9"
|
||||
integrity sha512-MguG95oij0fC3QV3URf4V2SDYGJhJnJGqvIIgdECeODCT98wSWDAJ94SSuVpYQUoTcGUIL6L4yNB7j1DFFHSBg==
|
||||
|
||||
regenerator-runtime@^0.13.2:
|
||||
version "0.13.3"
|
||||
resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.13.3.tgz#7cf6a77d8f5c6f60eb73c5fc1955b2ceb01e6bf5"
|
||||
integrity sha512-naKIZz2GQ8JWh///G7L3X6LaQUAMp2lvb1rvwwsURe/VXwD6VMfr+/1NuNw3ag8v2kY1aQ/go5SNn79O9JU7yw==
|
||||
|
||||
regenerator-transform@^0.10.0:
|
||||
version "0.10.1"
|
||||
resolved "https://registry.yarnpkg.com/regenerator-transform/-/regenerator-transform-0.10.1.tgz#1e4996837231da8b7f3cf4114d71b5691a0680dd"
|
||||
|
@ -6220,7 +6242,7 @@ resolve-url@^0.2.1:
|
|||
resolved "https://registry.yarnpkg.com/resolve-url/-/resolve-url-0.2.1.tgz#2c637fe77c893afd2a663fe21aa9080068e2052a"
|
||||
integrity sha1-LGN/53yJOv0qZj/iGqkIAGjiBSo=
|
||||
|
||||
resolve@^1.10.0, resolve@^1.3.2:
|
||||
resolve@^1.10.0, resolve@^1.3.2, resolve@^1.8.1:
|
||||
version "1.12.0"
|
||||
resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.12.0.tgz#3fc644a35c84a48554609ff26ec52b66fa577df6"
|
||||
integrity sha512-B/dOmuoAik5bKcD6s6nXDCjzUKnaDvdkRyAk6rsmsKLipWj4797iothd7jmmUhWTfinVMU+wc56rYKsit2Qy4w==
|
||||
|
@ -6402,7 +6424,7 @@ semaphore@>=1.0.1, semaphore@^1.0.3, semaphore@^1.1.0:
|
|||
resolved "https://registry.yarnpkg.com/semaphore/-/semaphore-1.1.0.tgz#aaad8b86b20fe8e9b32b16dc2ee682a8cd26a8aa"
|
||||
integrity sha512-O4OZEaNtkMd/K0i6js9SL+gqy0ZCBMgUvlSqHKi4IBdjhe7wB8pwztUk1BbZ1fmrvpwFrPbHzqd2w5pTcJH6LA==
|
||||
|
||||
"semver@2 || 3 || 4 || 5", semver@^5.3.0, semver@^5.4.1, semver@^5.5.0, semver@^5.6.0:
|
||||
"semver@2 || 3 || 4 || 5", semver@^5.3.0, semver@^5.4.1, semver@^5.5.0, semver@^5.5.1, semver@^5.6.0:
|
||||
version "5.7.1"
|
||||
resolved "https://registry.yarnpkg.com/semver/-/semver-5.7.1.tgz#a954f931aeba508d307bbf069eff0c01c96116f7"
|
||||
integrity sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==
|
||||
|
|
Loading…
Reference in New Issue