feat: allow tracking logs (#34)
* fix: error handling * feat: log syncer * fix: added subspace.db to ignore list * feat: test case for logs
This commit is contained in:
parent
97d33bbe2a
commit
01c7b2a8be
|
@ -4,4 +4,5 @@ phoenix.db
|
||||||
TODO
|
TODO
|
||||||
test.js
|
test.js
|
||||||
/dist/
|
/dist/
|
||||||
/react/
|
/react/
|
||||||
|
subspace.db
|
||||||
|
|
|
@ -4,3 +4,4 @@ phoenix.db
|
||||||
examples/
|
examples/
|
||||||
test/
|
test/
|
||||||
.editorconfig
|
.editorconfig
|
||||||
|
subspace.db
|
||||||
|
|
|
@ -12,7 +12,7 @@ class EventSyncer {
|
||||||
|
|
||||||
track(contractInstance, eventName, filterConditionsOrCb){
|
track(contractInstance, eventName, filterConditionsOrCb){
|
||||||
const isFilterFunction = typeof filterConditionsOrCb === 'function';
|
const isFilterFunction = typeof filterConditionsOrCb === 'function';
|
||||||
const eventKey = eventName + '-' + hash(isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}));
|
const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}))));
|
||||||
|
|
||||||
let filterConditions = {fromBlock: 0, toBlock: "latest"};
|
let filterConditions = {fromBlock: 0, toBlock: "latest"};
|
||||||
let filterConditionsCb;
|
let filterConditionsCb;
|
||||||
|
@ -142,8 +142,11 @@ class EventSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
||||||
if(err) return;
|
if(err) {
|
||||||
|
console.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (filterConditions) {
|
if (filterConditions) {
|
||||||
let propsToFilter = [];
|
let propsToFilter = [];
|
||||||
for (let prop in filterConditions.filter) {
|
for (let prop in filterConditions.filter) {
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
import { fromEvent, ReplaySubject } from 'rxjs';
|
||||||
|
import hash from 'object-hash';
|
||||||
|
|
||||||
|
class LogSyncer {
|
||||||
|
constructor(web3, events, db) {
|
||||||
|
this.events = events;
|
||||||
|
this.web3 = web3;
|
||||||
|
this.db = db;
|
||||||
|
|
||||||
|
this.subscriptions = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
track(options){
|
||||||
|
const eventKey = 'logs-' + hash(options || {});
|
||||||
|
const filterConditions = Object.assign({fromBlock: 0, toBlock: "latest"}, options || {});
|
||||||
|
|
||||||
|
|
||||||
|
const eventSummary = this.db.getLastKnownEvent(eventKey);
|
||||||
|
const sub = new ReplaySubject();
|
||||||
|
const logObserver = fromEvent(this.events, eventKey)
|
||||||
|
|
||||||
|
logObserver.subscribe((e) => {
|
||||||
|
if(!e) return;
|
||||||
|
|
||||||
|
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
|
||||||
|
const eventData = {
|
||||||
|
id: hash({eventName: eventKey, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
|
||||||
|
data: e.data,
|
||||||
|
address: e.address,
|
||||||
|
topics: e.topics
|
||||||
|
}
|
||||||
|
|
||||||
|
sub.next({blockNumber: e.blockNumber, data: e.data, address: e.address, topics: e.topics});
|
||||||
|
|
||||||
|
if (this.db.eventExists(eventKey, eventData.id)) return;
|
||||||
|
|
||||||
|
this.db.recordEvent(eventKey, eventData);
|
||||||
|
|
||||||
|
this.events.emit("updateDB");
|
||||||
|
});
|
||||||
|
|
||||||
|
const eth_subscribe = this._retrieveEvents(eventKey,
|
||||||
|
eventSummary.firstKnownBlock,
|
||||||
|
eventSummary.lastKnownBlock,
|
||||||
|
filterConditions
|
||||||
|
);
|
||||||
|
|
||||||
|
const og_subscribe = sub.subscribe;
|
||||||
|
sub.subscribe = (next, error, complete) => {
|
||||||
|
const s = og_subscribe.apply(sub, [next, error, complete]);
|
||||||
|
s.add(() => { // Removing web3js subscription when rxJS unsubscribe is executed
|
||||||
|
if(eth_subscribe) eth_subscribe.unsubscribe();
|
||||||
|
});
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
return sub;
|
||||||
|
}
|
||||||
|
|
||||||
|
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) {
|
||||||
|
// TODO: this should be moved to a 'smart' module
|
||||||
|
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
||||||
|
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
||||||
|
if (filterConditions.toBlock === 'latest') {
|
||||||
|
// emit DB Events [fromBlock, lastKnownBlock]
|
||||||
|
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions);
|
||||||
|
// create a event subscription [lastKnownBlock + 1, ...]
|
||||||
|
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||||
|
return this._subscribeToEvent(filters, eventKey);
|
||||||
|
}
|
||||||
|
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||||
|
// emit DB Events [fromBlock, toBlock]
|
||||||
|
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// emit DB Events [fromBlock, lastKnownBlock]
|
||||||
|
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions);
|
||||||
|
// create a past event subscription [lastKnownBlock + 1, toBlock]
|
||||||
|
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||||
|
this._getPastEvents(filters, eventKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (firstKnownBlock > 0) {
|
||||||
|
// create a past event subscription [ firstKnownBlock > fromBlock ? fromBlock : 0, firstKnownBlock - 1]
|
||||||
|
let fromBlock = firstKnownBlock > filterConditions.fromBlock ? filterConditions.fromBlock : 0;
|
||||||
|
let filters = Object.assign({}, filterConditions, { fromBlock, toBlock: firstKnownBlock - 1 });
|
||||||
|
this._getPastEvents(filters, eventKey);
|
||||||
|
if (filterConditions.toBlock === 'latest') {
|
||||||
|
// emit DB Events [firstKnownBlock, lastKnownBlock]
|
||||||
|
this._serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions);
|
||||||
|
// create a subscription [lastKnownBlock + 1, ...]
|
||||||
|
const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 });
|
||||||
|
return this._subscribeToEvent(filters, eventKey);
|
||||||
|
}
|
||||||
|
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||||
|
// emit DB Events [fromBlock, toBlock]
|
||||||
|
this._serveDBEvents(eventKey, filterConditions.fromBlock, filterConditions.toBlock, filterConditions);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// emit DB Events [fromBlock, lastKnownBlock]
|
||||||
|
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions);
|
||||||
|
// create a past event subscription [lastKnownBlock + 1, toBlock]
|
||||||
|
let filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1, toBlock: filterConditions.toBlock });
|
||||||
|
this._getPastEvents(filters, eventKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) {
|
||||||
|
const cb = this._parseEventCBFactory(filterConditions, eventKey);
|
||||||
|
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
|
||||||
|
storedEvents.forEach(ev => {
|
||||||
|
cb(null, ev);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_getPastEvents(filterConditions, eventKey) {
|
||||||
|
const cb = this._parseEventCBFactory(filterConditions, eventKey);
|
||||||
|
this.web3.getPastLogs(options, (err, logs) => {
|
||||||
|
logs.forEach(l => {
|
||||||
|
cb(err, l);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_subscribeToEvent(filterConditions, eventKey) {
|
||||||
|
const s = this.web3.subscribe('logs', filterConditions, this._parseEventCBFactory(filterConditions, eventKey));
|
||||||
|
this.subscriptions.push(s);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
_parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => {
|
||||||
|
if(err) {
|
||||||
|
console.error(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filterConditions) {
|
||||||
|
if(filterConditions.address && ev.address.toLowerCase() !== filterConditions.address.toLowerCase()) return;
|
||||||
|
if(filterConditions.topics){
|
||||||
|
let shouldSkip = false;
|
||||||
|
filterConditions.topics.forEach((topic, i) => {
|
||||||
|
if(topic != null && (!ev.topics[i] || ev.topics[i].toLowerCase() !== topic.toLowerCase())){
|
||||||
|
shouldSkip = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if(shouldSkip) return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.events.emit(eventKey, ev);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(){
|
||||||
|
this.subscriptions.forEach(x => {
|
||||||
|
x.unsubscribe();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default LogSyncer;
|
|
@ -7,6 +7,7 @@ import Web3Eth from 'web3-eth';
|
||||||
import stripHexPrefix from 'strip-hex-prefix';
|
import stripHexPrefix from 'strip-hex-prefix';
|
||||||
import toBN from 'number-to-bn';
|
import toBN from 'number-to-bn';
|
||||||
import EventSyncer from './eventSyncer';
|
import EventSyncer from './eventSyncer';
|
||||||
|
import LogSyncer from './logSyncer';
|
||||||
|
|
||||||
export default class Subspace {
|
export default class Subspace {
|
||||||
|
|
||||||
|
@ -16,7 +17,7 @@ export default class Subspace {
|
||||||
|
|
||||||
this.options = {};
|
this.options = {};
|
||||||
this.options.callInterval = options.callInterval || 0;
|
this.options.callInterval = options.callInterval || 0;
|
||||||
this.options.dbFilename = options.dbFilename || 'phoenix.db';
|
this.options.dbFilename = options.dbFilename || 'subspace.db';
|
||||||
|
|
||||||
this.newBlocksSubscription = null;
|
this.newBlocksSubscription = null;
|
||||||
this.intervalTracker = null;
|
this.intervalTracker = null;
|
||||||
|
@ -28,7 +29,7 @@ export default class Subspace {
|
||||||
this._db = new Database(this.options.dbFilename, this.events, resolve);
|
this._db = new Database(this.options.dbFilename, this.events, resolve);
|
||||||
this.db = this._db.db;
|
this.db = this._db.db;
|
||||||
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
||||||
|
this.logSyncer = new LogSyncer(this.web3, this.events, this._db);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +38,19 @@ export default class Subspace {
|
||||||
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
|
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clearDB(collection) {
|
||||||
|
if(collection){
|
||||||
|
// TODO: delete specific collection
|
||||||
|
} else {
|
||||||
|
// TODO: delete everything
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trackLogs(options) {
|
||||||
|
return this.logSyncer.track(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
_initNewBlocksSubscription() {
|
_initNewBlocksSubscription() {
|
||||||
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
|
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
const Web3Eth = require('web3-eth');
|
||||||
|
|
||||||
|
let eth = new Web3Eth("wss://mainnet.infura.io/ws/v3/e62b6ada19b042ee9c6d68746b965ccf");
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
let accounts = await eth.getAccounts();
|
||||||
|
|
||||||
|
const EventSyncer = require('../dist/node.js');
|
||||||
|
const eventSyncer = new EventSyncer(eth.currentProvider);
|
||||||
|
|
||||||
|
await eventSyncer.init()
|
||||||
|
|
||||||
|
// Testing single block with a event
|
||||||
|
eventSyncer.trackLogs({address: "0x744d70fdbe2ba4cf95131626614a1763df805b9e", topics: ["0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", "0x00000000000000000000000068f47e153e1aa7d6529e078feff86eada87ddee3", null]}).subscribe((v) => {
|
||||||
|
console.log(v);
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
run()
|
Loading…
Reference in New Issue