This commit is contained in:
Iuri Matias 2019-10-02 17:58:42 -04:00
parent 768d726861
commit 5fca3802c4
9 changed files with 36 additions and 102 deletions

View File

@ -22,7 +22,6 @@ class App extends React.Component {
await subspace.init(); await subspace.init();
Product = await ProductContract.getInstance(); Product = await ProductContract.getInstance();
console.dir(Product.options)
const rating$ = subspace.trackEvent(Product, "Rating").pipe(map(x => parseInt(x.rating))); const rating$ = subspace.trackEvent(Product, "Rating").pipe(map(x => parseInt(x.rating)));
window.Product = Product; window.Product = Product;

View File

@ -21,12 +21,10 @@ const getENV = function () {
} }
return 'BROWSER'; return 'BROWSER';
} }
return 'CORDOVA'; return 'CORDOVA';
}; };
class Database { class Database {
constructor(dbFilename, events, cb) { constructor(dbFilename, events, cb) {
@ -56,20 +54,19 @@ class Database {
let firstKnownBlock = 0; let firstKnownBlock = 0;
let lastKnownBlock = 0; let lastKnownBlock = 0;
if(collection && collection.count()){ if (collection && collection.count()){
firstKnownBlock = collection.min('blockNumber'); firstKnownBlock = collection.min('blockNumber');
lastKnownBlock = collection.max('blockNumber'); lastKnownBlock = collection.max('blockNumber');
} else { } else {
this.db.addCollection(eventKey); this.db.addCollection(eventKey);
} }
return { return {
firstKnownBlock: firstKnownBlock || 0, firstKnownBlock: firstKnownBlock || 0,
lastKnownBlock: lastKnownBlock || 0 lastKnownBlock: lastKnownBlock || 0
}; };
} }
getEventsFor(eventKey) { getEventsFor(eventKey) {
let children = this.db.getCollection(eventKey); let children = this.db.getCollection(eventKey);
return children.find(); return children.find();

View File

@ -2,15 +2,15 @@ import { fromEvent, ReplaySubject } from 'rxjs';
import hash from 'object-hash'; import hash from 'object-hash';
class EventSyncer { class EventSyncer {
constructor(web3, events, db) { constructor(web3, events, db) {
this.events = events; this.events = events;
this.web3 = web3; this.web3 = web3;
this.db = db; this.db = db;
this.subscriptions = []; this.subscriptions = [];
} }
track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum){ track(contractInstance, eventName, filterConditionsOrCb, gteBlockNum) {
const isFilterFunction = typeof filterConditionsOrCb === 'function'; const isFilterFunction = typeof filterConditionsOrCb === 'function';
const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {})))); const eventKey = hash(Object.assign({address: contractInstance.options.address}, (isFilterFunction ? {filterConditionsOrCb} : (filterConditionsOrCb || {}))));
@ -30,10 +30,10 @@ class EventSyncer {
let contractObserver = fromEvent(this.events, eventKey) let contractObserver = fromEvent(this.events, eventKey)
contractObserver.subscribe((e) => { contractObserver.subscribe((e) => {
if(!e) return; if (!e) return;
const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}); const id = hash({eventName, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex});
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions // TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
const eventData = { const eventData = {
id, id,
@ -47,8 +47,8 @@ class EventSyncer {
// TODO: test reorgs // TODO: test reorgs
sub.next({blockNumber: e.blockNumber, ...e.returnValues}); sub.next({blockNumber: e.blockNumber, ...e.returnValues});
if(e.removed){ if (e.removed){
this.db.deleteEvent(eventKey, id); this.db.deleteEvent(eventKey, id);
return; return;
} }
@ -83,7 +83,7 @@ class EventSyncer {
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) { _retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
// TODO: this should be moved to a 'smart' module // TODO: this should be moved to a 'smart' module
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit // it should be able to do events X at the time to avoid slow downs as well as the 10k limit
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) { if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
if (filterConditions.toBlock === 'latest') { if (filterConditions.toBlock === 'latest') {
// emit DB Events [fromBlock, lastKnownBlock] // emit DB Events [fromBlock, lastKnownBlock]
@ -129,7 +129,7 @@ class EventSyncer {
} }
} }
} }
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) { _serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb) {
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey); const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock); const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
@ -137,7 +137,7 @@ class EventSyncer {
cb(null, ev); cb(null, ev);
}); });
} }
_getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) { _getPastEvents(contract, eventName, filterConditions, filterConditionsCb, eventKey) {
const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey); const cb = this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey);
contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => { contract.getPastEvents.apply(contract, [eventName, filterConditions, (err, events) => {
@ -146,13 +146,13 @@ class EventSyncer {
}); });
}]); }]);
} }
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) { _subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
const s = event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]); const s = event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
this.subscriptions.push(s); this.subscriptions.push(s);
return s; return s;
} }
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => { _parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
if(err) { if(err) {
console.error(err); console.error(err);

View File

@ -10,18 +10,17 @@ class LogSyncer {
this.subscriptions = []; this.subscriptions = [];
} }
track(options){ track(options) {
const eventKey = 'logs-' + hash(options || {}); const eventKey = 'logs-' + hash(options || {});
const filterConditions = Object.assign({fromBlock: 0, toBlock: "latest"}, options || {}); const filterConditions = Object.assign({fromBlock: 0, toBlock: "latest"}, options || {});
const eventSummary = this.db.getLastKnownEvent(eventKey); const eventSummary = this.db.getLastKnownEvent(eventKey);
const sub = new ReplaySubject(); const sub = new ReplaySubject();
const logObserver = fromEvent(this.events, eventKey) const logObserver = fromEvent(this.events, eventKey)
logObserver.subscribe((e) => { logObserver.subscribe((e) => {
if(!e) return; if (!e) return;
// TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions // TODO: would be nice if this was smart enough to understand the type of returnValues and do the needed conversions
const eventData = { const eventData = {
id: hash({eventName: eventKey, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}), id: hash({eventName: eventKey, blockNumber: e.blockNumber, transactionIndex: e.transactionIndex, logIndex: e.logIndex}),
@ -105,7 +104,7 @@ class LogSyncer {
} }
} }
} }
_serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) { _serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) {
const cb = this._parseEventCBFactory(filterConditions, eventKey); const cb = this._parseEventCBFactory(filterConditions, eventKey);
const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock); const storedEvents = this.db.getEventsFor(eventKey).filter(x => x.blockNumber >= firstKnownBlock && x.blockNumber <= lastKnownBlock);
@ -113,7 +112,7 @@ class LogSyncer {
cb(null, ev); cb(null, ev);
}); });
} }
_getPastEvents(filterConditions, eventKey) { _getPastEvents(filterConditions, eventKey) {
const cb = this._parseEventCBFactory(filterConditions, eventKey); const cb = this._parseEventCBFactory(filterConditions, eventKey);
this.web3.getPastLogs(options, (err, logs) => { this.web3.getPastLogs(options, (err, logs) => {
@ -126,31 +125,31 @@ class LogSyncer {
}) })
}); });
} }
_subscribeToEvent(filterConditions, eventKey) { _subscribeToEvent(filterConditions, eventKey) {
const s = this.web3.subscribe('logs', filterConditions, this._parseEventCBFactory(filterConditions, eventKey)); const s = this.web3.subscribe('logs', filterConditions, this._parseEventCBFactory(filterConditions, eventKey));
this.subscriptions.push(s); this.subscriptions.push(s);
return s; return s;
} }
_parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => { _parseEventCBFactory = (filterConditions, eventKey) => (err, ev) => {
if(err) { if (err) {
throw new Error(err); throw new Error(err);
} }
if (filterConditions) { if (filterConditions) {
if(filterConditions.address && ev.address.toLowerCase() !== filterConditions.address.toLowerCase()) return; if (filterConditions.address && ev.address.toLowerCase() !== filterConditions.address.toLowerCase()) return;
if(filterConditions.topics){ if (filterConditions.topics){
let shouldSkip = false; let shouldSkip = false;
filterConditions.topics.forEach((topic, i) => { filterConditions.topics.forEach((topic, i) => {
if(topic != null && (!ev.topics[i] || ev.topics[i].toLowerCase() !== topic.toLowerCase())){ if (topic != null && (!ev.topics[i] || ev.topics[i].toLowerCase() !== topic.toLowerCase())){
shouldSkip = true; shouldSkip = true;
} }
}); });
if(shouldSkip) return; if(shouldSkip) return;
} }
} }
this.events.emit(eventKey, ev); this.events.emit(eventKey, ev);
} }

View File

@ -1,30 +0,0 @@
// var Web3 = require('web3')
const Events = require('events')
const { map, scan } = require('rxjs/operators');
const Simulator = require('./simulator.js')
const EventSyncer = require('./eventSyncer.js')
const events = new Events()
const eventSyncer = new EventSyncer(events);
eventSyncer.init(run);
function run() {
let myscan = scan((acc, curr) => {
acc.push(curr);
if (acc.length > 4) {
acc.shift();
}
return acc;
}, [])
let mymap = map(arr => arr.reduce((acc, current) => acc + current, 0) / arr.length)
eventSyncer.trackEvent('contractEvent', ((x) => x.from === "0x123")).pipe(map(x => x.rating), myscan, mymap).subscribe((v) => {
console.dir("current average is " + v)
})
const simulator = new Simulator(events);
simulator.emitEvents()
}

View File

@ -41,11 +41,11 @@ export function observe(WrappedComponent) {
} }
}); });
} }
componentDidMount() { componentDidMount() {
Object.keys(this.props).forEach(this.subscribeToProp); Object.keys(this.props).forEach(this.subscribeToProp);
} }
componentWillUnmount() { componentWillUnmount() {
this.state.subscriptions.forEach(subscription => { this.state.subscriptions.forEach(subscription => {
subscription.unsubscribe(); subscription.unsubscribe();
@ -54,7 +54,7 @@ export function observe(WrappedComponent) {
componentDidUpdate(prevProps) { componentDidUpdate(prevProps) {
Object.keys(prevProps).forEach(prop => { Object.keys(prevProps).forEach(prop => {
if(!prevProps[prop] && this.props[prop]){ if (!prevProps[prop] && this.props[prop]){
this.subscribeToProp(prop); this.subscribeToProp(prop);
} else if(prevProps[prop] !== this.props[prop]){ } else if(prevProps[prop] !== this.props[prop]){
this.unsubscribe(prop); this.unsubscribe(prop);
@ -64,10 +64,10 @@ export function observe(WrappedComponent) {
// TODO: check if prevProps and currProps are different, and unsubscribe from prevProp // TODO: check if prevProps and currProps are different, and unsubscribe from prevProp
} }
render() { render() {
const props = Object.keys(this.props).reduce((accum, curr) => { const props = Object.keys(this.props).reduce((accum, curr) => {
if(!isObservable(this.props[curr])){ if (!isObservable(this.props[curr])){
accum[curr] = this.props[curr]; accum[curr] = this.props[curr];
return accum; return accum;
} }

View File

@ -1,27 +0,0 @@
class Simulator {
constructor(events) {
this.events = events;
this.contractEvents = [
{ id: 1, from: "0x123", type: "Rating", rating: 3 },
{ id: 2, from: "0x123", type: "Rating", rating: 1 },
{ id: 3, from: "0x234", type: "Rating", rating: 5 },
{ id: 4, from: "0x123", type: "Rating", rating: 4 },
{ id: 5, from: "0x123", type: "Rating", rating: 2 },
{ id: 6, from: "0x342", type: "Rating", rating: 2 }
]
}
emitEvents() {
let i = 0
// emit contract event each 1 second
setInterval(() => {
if (i >= this.contractEvents.length) return
this.events.emit("contractEvent", this.contractEvents[i])
i += 1
}, 1 * 1000)
}
}
module.exports = Simulator;

View File

@ -13,7 +13,6 @@ import LogSyncer from './logSyncer';
export default class Subspace { export default class Subspace {
constructor(provider, options = {}) { constructor(provider, options = {}) {
if(provider.constructor.name !== "WebsocketProvider"){ if(provider.constructor.name !== "WebsocketProvider"){
console.warn("subspace: it's recommended to use a websocket provider to react to new events"); console.warn("subspace: it's recommended to use a websocket provider to react to new events");
} }
@ -26,7 +25,7 @@ export default class Subspace {
this.options.callInterval = options.callInterval || 0; this.options.callInterval = options.callInterval || 0;
this.options.dbFilename = options.dbFilename || 'subspace.db'; this.options.dbFilename = options.dbFilename || 'subspace.db';
this.latestBlockNumber = undefined; this.latestBlockNumber = undefined;
this.newBlocksSubscription = null; this.newBlocksSubscription = null;
this.intervalTracker = null; this.intervalTracker = null;
this.callables = []; this.callables = [];
@ -63,7 +62,6 @@ export default class Subspace {
return this.logSyncer.track(options); return this.logSyncer.track(options);
} }
_initNewBlocksSubscription() { _initNewBlocksSubscription() {
if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return; if(this.newBlocksSubscription != null || this.options.callInterval !== 0) return;
@ -72,7 +70,7 @@ export default class Subspace {
sub.error(err); sub.error(err);
return; return;
} }
this.callables.forEach(fn => { this.callables.forEach(fn => {
fn(); fn();
}); });
@ -87,7 +85,6 @@ export default class Subspace {
fn(); fn();
}); });
}, this.options.callInterval); }, this.options.callInterval);
} }
// TODO: should save value in database? // TODO: should save value in database?
@ -104,12 +101,12 @@ export default class Subspace {
sub.next(result); sub.next(result);
}]); }]);
}; };
callContractMethod(); callContractMethod();
this._initNewBlocksSubscription(); this._initNewBlocksSubscription();
this._initCallInterval(); this._initCallInterval();
this.callables.push(callContractMethod); this.callables.push(callContractMethod);
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
@ -153,7 +150,7 @@ export default class Subspace {
this._initNewBlocksSubscription(); this._initNewBlocksSubscription();
this._initCallInterval(); this._initCallInterval();
this.callables.push(callFn); this.callables.push(callFn);
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));

View File

@ -3,7 +3,6 @@ export function randomString() {
return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
} }
export function isAddress(address) { export function isAddress(address) {
return /^(0x)?[0-9a-fA-F]{40}$/i.test(address) return /^(0x)?[0-9a-fA-F]{40}$/i.test(address)
}; };