feat: observables for callable functions (#71)

* return eth subscription and observable array
* adding checksum address function to utils
* use observables triggered by a subject
This commit is contained in:
Richard Ramos 2020-02-15 00:28:22 -04:00 committed by GitHub
parent 2646cd58d1
commit 8dea143adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 190 deletions

View File

@ -67,7 +67,7 @@ class EventSyncer {
if (this.isWebsocketProvider) {
const fnSubscribe = this.subscribeToEvent(eventKey, contractInstance, eventName);
const eth_subscribe = this.eventScanner.scan(
const ethSubscription = this.eventScanner.scan(
fnDBEvents,
fnPastEvents,
fnSubscribe,
@ -75,25 +75,11 @@ class EventSyncer {
lastKnownBlock,
filterConditions
);
const og_subscribe = sub.subscribe;
sub.subscribe = async (next, error, complete) => {
const s = og_subscribe.apply(sub, [next, error, complete]);
s.add(() => {
// Removing web3js subscription when rxJS unsubscribe is executed
eth_subscribe.then(susc => {
if (susc) {
susc.unsubscribe();
}
});
});
return s;
};
return [sub, ethSubscription];
} else {
this.eventScanner.scan(fnDBEvents, fnPastEvents, lastKnownBlock, filterConditions);
return [sub, undefined];
}
return sub;
}
getPastEvents = (eventKey, contractInstance, eventName, filters) => async (fromBlock, toBlock, hardLimit) => {

View File

@ -67,24 +67,14 @@ class LogSyncer {
this.events.emit("updateDB");
});
const eth_subscribe = this._retrieveEvents(
const ethSubscription = this._retrieveEvents(
eventKey,
eventSummary.firstKnownBlock,
eventSummary.lastKnownBlock,
filterConditions
);
const og_subscribe = sub.subscribe;
sub.subscribe = (next, error, complete) => {
const s = og_subscribe.apply(sub, [next, error, complete]);
s.add(() => {
// Removing web3js subscription when rxJS unsubscribe is executed
if (eth_subscribe) eth_subscribe.unsubscribe();
});
return s;
};
return sub;
return [sub, ethSubscription];
}
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions) {

View File

@ -1,5 +1,5 @@
import {ReplaySubject, BehaviorSubject} from "rxjs";
import {distinctUntilChanged, map} from "rxjs/operators";
import {BehaviorSubject, from} from "rxjs";
import {distinctUntilChanged, map, exhaustMap, shareReplay} from "rxjs/operators";
import equal from "fast-deep-equal";
import Database from "./database/database.js";
import NullDatabase from "./database/nullDatabase.js";
@ -13,8 +13,7 @@ import LogSyncer from "./logSyncer";
import hash from "object-hash";
export default class Subspace {
subjects = {};
callables = [];
observables = {};
newBlocksSubscription = null;
intervalTracker = null;
@ -40,6 +39,7 @@ export default class Subspace {
this.networkId = undefined;
this.isWebsocketProvider = options.disableSubscriptions ? false : !!provider.on;
this.triggerSubject = new BehaviorSubject();
}
init() {
@ -154,32 +154,36 @@ export default class Subspace {
console.error(err);
return;
}
this.callables.forEach(fn => fn());
this.triggerSubject.next();
});
}
_initCallInterval() {
if (this.intervalTracker != null || this.options.callInterval === 0) return;
this.intervalTracker = setInterval(() => {
this.callables.forEach(fn => fn());
}, this.options.callInterval);
this.intervalTracker = setInterval(() => this.triggerSubject.next(), this.options.callInterval);
}
_getSubject(subjectHash, subjectCB) {
if (this.subjects[subjectHash]) return this.subjects[subjectHash];
this.subjects[subjectHash] = subjectCB();
return this.subjects[subjectHash];
_getObservable(subjectHash, observableBuilder) {
if (this.observables[subjectHash]) return this.observables[subjectHash];
this.observables[subjectHash] = observableBuilder();
return this.observables[subjectHash];
}
_addDistinctCallable(trackAttribute, cbBuilder, SubjectType, subjectArg = undefined) {
return this._getSubject(trackAttribute, () => {
const sub = new SubjectType(subjectArg);
const cb = cbBuilder(sub);
cb();
this.callables.push(cb);
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
_getDistinctObservableFromPromise(subjectName, promiseCB, cb) {
return this._getObservable(subjectName, () => {
let observable = this.triggerSubject.pipe(
exhaustMap(() => from(promiseCB())),
distinctUntilChanged((a, b) => equal(a, b))
);
if(cb){
observable = observable.pipe(map(x => {
cb(x);
return x;
}));
}
return observable.pipe(shareReplay({refCount: true, bufferSize: 1}));
});
}
@ -190,11 +194,14 @@ export default class Subspace {
eventName,
filterConditions
});
return this._getSubject(subjectHash, () => {
let deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks;
let returnSub = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId);
returnSub.map = prop => {
return this._getObservable(subjectHash, () => {
const deleteFrom = this.latestBlockNumber - this.options.refreshLastNBlocks;
const [subject, ethSubscription] = this.eventSyncer.track(contractInstance, eventName, filterConditions, deleteFrom, this.networkId);
// TODO: remove eth subscription
subject.map = prop => {
return returnSub.pipe(
map(x => {
if (typeof prop === "string") {
@ -211,7 +218,7 @@ export default class Subspace {
);
};
return returnSub;
return subject;
});
}
@ -219,13 +226,20 @@ export default class Subspace {
if (!this.isWebsocketProvider) console.warn("This method only works with websockets");
const subjectHash = hash({inputsABI, options});
return this._getSubject(subjectHash, () =>
this.logSyncer.track(options, inputsABI, this.latestBlockNumber - this.options.refreshLastNBlocks, this.networkId)
);
return this._getObservable(subjectHash, () => {
const [subject, ethSubscription] = this.logSyncer.track(
options,
inputsABI,
this.latestBlockNumber - this.options.refreshLastNBlocks,
this.networkId
);
// TODO: remove eth subscription
return subject;
});
}
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
const subjectHash = hash({
const identifier = hash({
address: contractInstance.options.address,
networkId: this.networkId,
propName,
@ -233,156 +247,86 @@ export default class Subspace {
callArgs
});
return this._getSubject(subjectHash, () => {
const subject = new ReplaySubject(1);
const observable = this._getDistinctObservableFromPromise(identifier, () => {
if (!Array.isArray(methodArgs)) {
methodArgs = [methodArgs];
}
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs);
const callContractMethod = () => {
method.call.apply(method.call, [
callArgs,
(err, result) => {
if (err) {
subject.error(err);
return;
}
subject.next(result);
}
]);
};
callContractMethod();
this.callables.push(callContractMethod);
const returnSub = subject.pipe(distinctUntilChanged((a, b) => equal(a, b)));
returnSub.map = prop => {
return returnSub.pipe(
map(x => {
if (typeof prop === "string") {
return x[prop];
}
if (Array.isArray(prop)) {
let newValues = {};
prop.forEach(p => {
newValues[p] = x[p];
});
return newValues;
}
})
);
};
return returnSub;
return method.call.apply(method.call, [callArgs]);
});
observable.map = prop => {
return observable.pipe(
map(x => {
if (typeof prop === "string") {
return x[prop];
}
if (Array.isArray(prop)) {
let newValues = {};
prop.forEach(p => {
newValues[p] = x[p];
});
return newValues;
}
})
);
};
return observable;
}
trackBalance(address, erc20Address) {
if (!isAddress(address)) throw "invalid address";
if (erc20Address && !isAddress(erc20Address)) throw "invalid ERC20 contract address";
const subjectHash = hash({address, erc20Address});
const getETHBalance = cb => {
const fn = this.web3.getBalance;
fn.apply(fn, [address, cb]);
};
const getTokenBalance = cb => {
const fn = this.web3.call;
// balanceOf
const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address);
fn.apply(fn, [{to: erc20Address, data}, cb]);
};
let callFn;
if (!erc20Address) {
callFn = subject => () =>
getETHBalance((err, balance) => {
if (err) {
subject.error(err);
return;
}
subject.next(balance);
});
} else {
callFn = subject => () =>
getTokenBalance((err, balance) => {
if (err) {
subject.error(err);
return;
}
subject.next(hexToDec(balance));
});
}
return this._addDistinctCallable(subjectHash, callFn, ReplaySubject, 1);
}
trackBlock() {
const blockCB = subject => () => {
this.web3
.getBlock("latest")
.then(block => {
if (this.latest10Blocks[this.latest10Blocks.length - 1].number === block.number) return;
this.latest10Blocks.push(block);
if (this.latest10Blocks.length > 10) {
this.latest10Blocks.shift();
}
subject.next(block);
})
.catch(error => subject.error(error));
};
return this._addDistinctCallable(
"blockObservable",
blockCB,
BehaviorSubject,
this.latest10Blocks[this.latest10Blocks.length - 1]
);
address = toChecksumAddress(address);
erc20Address = toChecksumAddress(address);
return this._getDistinctObservableFromPromise(hash({address, erc20Address}), () => {
if (!erc20Address) {
return this.web3.getBalance(address);
} else {
// balanceOf
const data = "0x70a08231" + "000000000000000000000000" + stripHexPrefix(address);
return new Promise((resolve, reject) => this.web3.call({to: erc20Address, data}).then(balance => resolve(hexToDec(balance))).catch(reject));
}
});
}
trackBlockNumber() {
const blockNumberCB = subject => () => {
this.web3
.getBlockNumber()
.then(result => subject.next(result))
.catch(error => subject.error(error));
};
return this._addDistinctCallable("blockNumberObservable", blockNumberCB, ReplaySubject, 1);
return this._getDistinctObservableFromPromise("blockNumber", () => this.web3.getBlockNumber());
}
trackBlock() {
return this._getDistinctObservableFromPromise("gasPrice", () => this.web3.getBlock("latest"), block => {
if (this.latest10Blocks[this.latest10Blocks.length - 1].number === block.number) return;
this.latest10Blocks.push(block);
if (this.latest10Blocks.length > 10) {
this.latest10Blocks.shift();
}
});
}
trackGasPrice() {
const gasPriceCB = subject => () => {
this.web3
.getGasPrice()
.then(result => subject.next(result))
.catch(error => subject.error(error));
};
return this._addDistinctCallable("gasPriceObservable", gasPriceCB, ReplaySubject, 1);
return this._getDistinctObservableFromPromise("gasPrice", () => this.web3.getGasPrice());
}
trackAverageBlocktime() {
this.trackBlock();
const calcAverage = () => {
const times = [];
for (let i = 1; i < this.latest10Blocks.length; i++) {
let time = this.latest10Blocks[i].timestamp - this.latest10Blocks[i - 1].timestamp;
times.push(time);
}
return times.length ? Math.round(times.reduce((a, b) => a + b) / times.length) * 1000 : 0;
};
const avgTimeCB = subject => () => subject.next(calcAverage());
return this._addDistinctCallable("blockTimeObservable", avgTimeCB, BehaviorSubject, calcAverage());
return this._getObservable("avgBlockTime", () => {
const calcAverage = () => {
const times = [];
for (let i = 1; i < this.latest10Blocks.length; i++) {
let time = this.latest10Blocks[i].timestamp - this.latest10Blocks[i - 1].timestamp;
times.push(time);
}
return times.length ? Math.round(times.reduce((a, b) => a + b) / times.length) * 1000 : 0;
};
return this.trackBlock().pipe(
map(() => calcAverage()),
distinctUntilChanged((a, b) => equal(a, b))
);
});
}
close() {
@ -390,6 +334,5 @@ export default class Subspace {
if (this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe();
this.eventSyncer.close();
this.intervalTracker = null;
this.callables = [];
}
}

View File

@ -2,6 +2,24 @@ export function isAddress(address) {
return /^(0x)?[0-9a-fA-F]{40}$/i.test(address);
}
export function toChecksumAddress(address) {
if (typeof address === "undefined") return "";
address = address.toLowerCase().replace(/^0x/i, "");
var addressHash = utils.sha3(address).replace(/^0x/i, "");
var checksumAddress = "0x";
for (var i = 0; i < address.length; i++) {
// If ith character is 9 to f then make it uppercase
if (parseInt(addressHash[i], 16) > 7) {
checksumAddress += address[i].toUpperCase();
} else {
checksumAddress += address[i];
}
}
return checksumAddress;
}
export function sleep(milliseconds) {
return new Promise(resolve => setTimeout(resolve, milliseconds));
}