mirror of
https://github.com/embarklabs/subspace.git
synced 2025-02-17 00:56:31 +00:00
refactor: extract event tracking to file (#30)
This commit is contained in:
parent
2a213f053b
commit
454cfdecfd
@ -1,57 +1,34 @@
|
|||||||
import { fromEvent, ReplaySubject } from 'rxjs';
|
import { fromEvent, ReplaySubject } from 'rxjs';
|
||||||
import { distinctUntilChanged } from 'rxjs/operators';
|
|
||||||
import equal from 'fast-deep-equal';
|
|
||||||
import hash from 'object-hash';
|
import hash from 'object-hash';
|
||||||
import Database from './database.js';
|
|
||||||
import Events from 'events';
|
|
||||||
import Web3Eth from 'web3-eth';
|
|
||||||
import stripHexPrefix from 'strip-hex-prefix';
|
|
||||||
import toBN from 'number-to-bn';
|
|
||||||
|
|
||||||
export default class EventSyncer {
|
class EventSyncer {
|
||||||
|
constructor(web3, events, db) {
|
||||||
constructor(provider, options = {}) {
|
this.events = events;
|
||||||
this.events = new Events();
|
this.web3 = web3;
|
||||||
this.web3 = new Web3Eth(provider);
|
this.db = db;
|
||||||
|
|
||||||
this.options = {};
|
|
||||||
this.options.callInterval = options.callInterval || 0;
|
|
||||||
this.options.dbFilename = options.dbFilename || 'phoenix.db';
|
|
||||||
|
|
||||||
this.newBlocksSubscription = null;
|
|
||||||
this.intervalTracker = null;
|
|
||||||
this.callables = [];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
init() {
|
track(contractInstance, eventName, filterConditionsOrCb){
|
||||||
return new Promise((resolve, reject) => {
|
const isFilterFunction = typeof filterConditionsOrCb === 'function';
|
||||||
this._db = new Database(this.options.dbFilename, this.events, resolve);
|
const eventKey = eventName + '-' + hash(isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}));
|
||||||
this.db = this._db.db;
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: get contract abi/address instead
|
|
||||||
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
|
||||||
let eventKey = eventName + '-' + hash(filterConditionsOrCb);
|
|
||||||
|
|
||||||
let filterConditions = {fromBlock: 0, toBlock: "latest"};
|
let filterConditions = {fromBlock: 0, toBlock: "latest"};
|
||||||
let filterConditionsCb;
|
let filterConditionsCb;
|
||||||
if (typeof filterConditionsOrCb === 'function') {
|
if (isFilterFunction) {
|
||||||
filterConditionsCb = filterConditionsOrCb;
|
filterConditionsCb = filterConditionsOrCb;
|
||||||
} else {
|
} else {
|
||||||
filterConditions = Object.assign(filterConditions, filterConditionsOrCb || {});
|
filterConditions = Object.assign(filterConditions, filterConditionsOrCb || {});
|
||||||
}
|
}
|
||||||
|
|
||||||
let eventSummary = this._db.getLastKnownEvent(eventKey);
|
let eventSummary = this.db.getLastKnownEvent(eventKey);
|
||||||
|
|
||||||
let sub = new ReplaySubject();
|
let sub = new ReplaySubject();
|
||||||
let contractObserver = fromEvent(this.events, eventKey)
|
let contractObserver = fromEvent(this.events, eventKey)
|
||||||
|
|
||||||
contractObserver.subscribe((e) => {
|
contractObserver.subscribe((e) => {
|
||||||
if(!e) return;
|
if(!e) return;
|
||||||
|
|
||||||
// TODO: would be nice if trackEvent was smart enough to understand the type of returnValues and do the needed conversions
|
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
|
||||||
|
|
||||||
const eventData = {
|
const eventData = {
|
||||||
id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
|
id: hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
|
||||||
returnValues: {...e.returnValues},
|
returnValues: {...e.returnValues},
|
||||||
@ -62,9 +39,9 @@ export default class EventSyncer {
|
|||||||
|
|
||||||
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
|
sub.next({blockNumber: e.blockNumber, ...e.returnValues});
|
||||||
|
|
||||||
if (this._db.eventExists(eventKey, eventData.id)) return;
|
if (this.db.eventExists(eventKey, eventData.id)) return;
|
||||||
|
|
||||||
this._db.recordEvent(eventKey, eventData);
|
this.db.recordEvent(eventKey, eventData);
|
||||||
|
|
||||||
this.events.emit("updateDB");
|
this.events.emit("updateDB");
|
||||||
});
|
});
|
||||||
@ -80,12 +57,11 @@ export default class EventSyncer {
|
|||||||
return sub;
|
return sub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
|
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
|
||||||
// TODO: this should be moved to a 'smart' module
|
// TODO: this should be moved to a 'smart' module
|
||||||
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
||||||
// TODO: filter subscriptions with fromBlock and toBlock
|
// TODO: filter subscriptions with fromBlock and toBlock
|
||||||
|
|
||||||
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
||||||
if (filterConditions.toBlock === 'latest') {
|
if (filterConditions.toBlock === 'latest') {
|
||||||
// emit DB Events [fromBlock, lastKnownBlock]
|
// emit DB Events [fromBlock, lastKnownBlock]
|
||||||
@ -131,32 +107,31 @@ export default class EventSyncer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) {
|
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) {
|
||||||
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
||||||
const storedEvents = this._db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
|
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
|
||||||
storedEvents.forEach(ev => {
|
storedEvents.forEach(ev => {
|
||||||
cb(null, ev);
|
cb(null, ev);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) {
|
_getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) {
|
||||||
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
|
||||||
contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => {
|
contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => {
|
||||||
events.forEach(ev => {
|
events.forEach(ev => {
|
||||||
cb(err, ev);
|
cb(err, ev);
|
||||||
});
|
});
|
||||||
} ]);
|
}]);
|
||||||
}
|
}
|
||||||
|
|
||||||
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
|
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
|
||||||
event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
|
event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
||||||
if(err) return;
|
if(err) return;
|
||||||
|
|
||||||
if (filterConditions) {
|
if (filterConditions) {
|
||||||
let propsToFilter = [];
|
let propsToFilter = [];
|
||||||
for (let prop in filterConditions.filter) {
|
for (let prop in filterConditions.filter) {
|
||||||
@ -165,118 +140,14 @@ export default class EventSyncer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (let prop of propsToFilter) {
|
for (let prop of propsToFilter) {
|
||||||
if (filterConditions.filter[prop] !== ev.returnValues[prop])
|
if (filterConditions.filter[prop] !== ev.returnValues[prop]) return;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (filterConditionsCb && !filterConditionsCb(ev.returnValues)) {
|
else if (filterConditionsCb && !filterConditionsCb(ev.returnValues)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.events.emit(eventKey, ev);
|
this.events.emit(eventKey, ev);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_initNewBlocksSubscription() {
|
export default EventSyncer;
|
||||||
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
|
|
||||||
|
|
||||||
this.newBlocksSubscription = this.web3.subscribe('newBlockHeaders', (err, result) => {
|
|
||||||
if(err) {
|
|
||||||
sub.error(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.callables.forEach(fn => {
|
|
||||||
fn();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
_initCallInterval() {
|
|
||||||
if(this.intervalTracker != null || this.options.callInterval === 0) return;
|
|
||||||
|
|
||||||
this.intervalTracker = setInterval(() => {
|
|
||||||
this.callables.forEach(fn => {
|
|
||||||
fn();
|
|
||||||
});
|
|
||||||
}, this.options.callInterval);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: should save value in database?
|
|
||||||
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
|
|
||||||
|
|
||||||
const sub = new ReplaySubject();
|
|
||||||
|
|
||||||
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs)
|
|
||||||
const callContractMethod = () => {
|
|
||||||
method.call.apply(method.call, [callArgs, (err, result) => {
|
|
||||||
if(err) {
|
|
||||||
sub.error(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sub.next(result);
|
|
||||||
}]);
|
|
||||||
};
|
|
||||||
|
|
||||||
callContractMethod();
|
|
||||||
|
|
||||||
this._initNewBlocksSubscription();
|
|
||||||
this._initCallInterval();
|
|
||||||
|
|
||||||
this.callables.push(callContractMethod);
|
|
||||||
|
|
||||||
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: should save value in database?
|
|
||||||
trackBalance(address, erc20Address) {
|
|
||||||
const sub = new ReplaySubject();
|
|
||||||
|
|
||||||
// TODO: validate address?
|
|
||||||
|
|
||||||
let callFn;
|
|
||||||
if(!erc20Address){
|
|
||||||
callFn = () => {
|
|
||||||
const fn = this.web3.getBalance;
|
|
||||||
|
|
||||||
fn.apply(fn, [address, (err, balance) => {
|
|
||||||
if(err) {
|
|
||||||
sub.error(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sub.next(balance);
|
|
||||||
}]);
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
callFn = () => {
|
|
||||||
const fn = this.web3.call;
|
|
||||||
// balanceOf
|
|
||||||
const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(erc20Address);
|
|
||||||
fn.apply(fn, [{to: erc20Address, data}, (err, result) => {
|
|
||||||
if(err) {
|
|
||||||
sub.error(err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sub.next(toBN(result).toString(10));
|
|
||||||
}]);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
callFn();
|
|
||||||
|
|
||||||
this._initNewBlocksSubscription();
|
|
||||||
this._initCallInterval();
|
|
||||||
|
|
||||||
this.callables.push(callFn);
|
|
||||||
|
|
||||||
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
|
|
||||||
}
|
|
||||||
|
|
||||||
clean(){
|
|
||||||
clearInterval(this.intervalTracker);
|
|
||||||
this.newBlocksSubscription.unsubscribe();
|
|
||||||
this.intervalTracker = null;
|
|
||||||
this.callables = [];
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
@ -1,2 +1,2 @@
|
|||||||
export {default} from './eventSyncer.js';
|
export {default} from './subspace';
|
||||||
export * from './operators';
|
export * from './operators';
|
||||||
|
142
src/subspace.js
Normal file
142
src/subspace.js
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
import { ReplaySubject } from 'rxjs';
|
||||||
|
import { distinctUntilChanged } from 'rxjs/operators';
|
||||||
|
import equal from 'fast-deep-equal';
|
||||||
|
import Database from './database.js';
|
||||||
|
import Events from 'events';
|
||||||
|
import Web3Eth from 'web3-eth';
|
||||||
|
import stripHexPrefix from 'strip-hex-prefix';
|
||||||
|
import toBN from 'number-to-bn';
|
||||||
|
import EventSyncer from './eventSyncer';
|
||||||
|
|
||||||
|
export default class Subspace {
|
||||||
|
|
||||||
|
constructor(provider, options = {}) {
|
||||||
|
this.events = new Events();
|
||||||
|
this.web3 = new Web3Eth(provider);
|
||||||
|
|
||||||
|
this.options = {};
|
||||||
|
this.options.callInterval = options.callInterval || 0;
|
||||||
|
this.options.dbFilename = options.dbFilename || 'phoenix.db';
|
||||||
|
|
||||||
|
this.newBlocksSubscription = null;
|
||||||
|
this.intervalTracker = null;
|
||||||
|
this.callables = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
init() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this._db = new Database(this.options.dbFilename, this.events, resolve);
|
||||||
|
this.db = this._db.db;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: get contract abi/address instead
|
||||||
|
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
||||||
|
const eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
||||||
|
return eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
|
||||||
|
}
|
||||||
|
|
||||||
|
_initNewBlocksSubscription() {
|
||||||
|
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
|
||||||
|
|
||||||
|
this.newBlocksSubscription = this.web3.subscribe('newBlockHeaders', (err, result) => {
|
||||||
|
if(err) {
|
||||||
|
sub.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.callables.forEach(fn => {
|
||||||
|
fn();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_initCallInterval() {
|
||||||
|
if(this.intervalTracker != null || this.options.callInterval === 0) return;
|
||||||
|
|
||||||
|
this.intervalTracker = setInterval(() => {
|
||||||
|
this.callables.forEach(fn => {
|
||||||
|
fn();
|
||||||
|
});
|
||||||
|
}, this.options.callInterval);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: should save value in database?
|
||||||
|
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
|
||||||
|
|
||||||
|
const sub = new ReplaySubject();
|
||||||
|
|
||||||
|
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs)
|
||||||
|
const callContractMethod = () => {
|
||||||
|
method.call.apply(method.call, [callArgs, (err, result) => {
|
||||||
|
if(err) {
|
||||||
|
sub.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sub.next(result);
|
||||||
|
}]);
|
||||||
|
};
|
||||||
|
|
||||||
|
callContractMethod();
|
||||||
|
|
||||||
|
this._initNewBlocksSubscription();
|
||||||
|
this._initCallInterval();
|
||||||
|
|
||||||
|
this.callables.push(callContractMethod);
|
||||||
|
|
||||||
|
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: should save value in database?
|
||||||
|
trackBalance(address, erc20Address) {
|
||||||
|
const sub = new ReplaySubject();
|
||||||
|
|
||||||
|
// TODO: validate address?
|
||||||
|
|
||||||
|
let callFn;
|
||||||
|
if(!erc20Address){
|
||||||
|
callFn = () => {
|
||||||
|
const fn = this.web3.getBalance;
|
||||||
|
|
||||||
|
fn.apply(fn, [address, (err, balance) => {
|
||||||
|
if(err) {
|
||||||
|
sub.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sub.next(balance);
|
||||||
|
}]);
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
callFn = () => {
|
||||||
|
const fn = this.web3.call;
|
||||||
|
// balanceOf
|
||||||
|
const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(erc20Address);
|
||||||
|
fn.apply(fn, [{to: erc20Address, data}, (err, result) => {
|
||||||
|
if(err) {
|
||||||
|
sub.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sub.next(toBN(result).toString(10));
|
||||||
|
}]);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
callFn();
|
||||||
|
|
||||||
|
this._initNewBlocksSubscription();
|
||||||
|
this._initCallInterval();
|
||||||
|
|
||||||
|
this.callables.push(callFn);
|
||||||
|
|
||||||
|
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
|
||||||
|
}
|
||||||
|
|
||||||
|
clean(){
|
||||||
|
clearInterval(this.intervalTracker);
|
||||||
|
this.newBlocksSubscription.unsubscribe();
|
||||||
|
this.intervalTracker = null;
|
||||||
|
this.callables = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user