mirror of
https://github.com/embarklabs/subspace.git
synced 2025-01-18 10:33:13 +00:00
fix: close web3 subscription on rxjs unsubscribe (#31)
This commit is contained in:
parent
8cf6d5c619
commit
cf04e028f2
@ -6,6 +6,8 @@ class EventSyncer {
|
||||
this.events = events;
|
||||
this.web3 = web3;
|
||||
this.db = db;
|
||||
|
||||
this.subscriptions = [];
|
||||
}
|
||||
|
||||
track(contractInstance, eventName, filterConditionsOrCb){
|
||||
@ -46,13 +48,22 @@ class EventSyncer {
|
||||
this.events.emit("updateDB");
|
||||
});
|
||||
|
||||
this._retrieveEvents(eventKey,
|
||||
eventSummary.firstKnownBlock,
|
||||
eventSummary.lastKnownBlock,
|
||||
filterConditions,
|
||||
filterConditionsCb,
|
||||
contractInstance,
|
||||
eventName);
|
||||
const eth_subscribe = this._retrieveEvents(eventKey,
|
||||
eventSummary.firstKnownBlock,
|
||||
eventSummary.lastKnownBlock,
|
||||
filterConditions,
|
||||
filterConditionsCb,
|
||||
contractInstance,
|
||||
eventName);
|
||||
|
||||
const og_subscribe = sub.subscribe;
|
||||
sub.subscribe = (next, error, complete) => {
|
||||
const s = og_subscribe.apply(sub, [next, error, complete]);
|
||||
s.add(() => { // Removing web3js subscription when rxJS unsubscribe is executed
|
||||
if(eth_subscribe) eth_subscribe.unsubscribe();
|
||||
});
|
||||
return s;
|
||||
}
|
||||
|
||||
return sub;
|
||||
}
|
||||
@ -60,7 +71,6 @@ class EventSyncer {
|
||||
_retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) {
|
||||
// TODO: this should be moved to a 'smart' module
|
||||
// it should be able to do events X at the time to avoid slow downs as well as the 10k limit
|
||||
// TODO: filter subscriptions with fromBlock and toBlock
|
||||
|
||||
if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) {
|
||||
if (filterConditions.toBlock === 'latest') {
|
||||
@ -68,7 +78,7 @@ class EventSyncer {
|
||||
this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a event subscription [lastKnownBlock + 1, ...]
|
||||
let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 });
|
||||
this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
@ -92,7 +102,7 @@ class EventSyncer {
|
||||
this._serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb);
|
||||
// create a subscription [lastKnownBlock + 1, ...]
|
||||
const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 });
|
||||
this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey);
|
||||
}
|
||||
else if (filterConditions.toBlock <= lastKnownBlock) {
|
||||
// emit DB Events [fromBlock, toBlock]
|
||||
@ -126,7 +136,9 @@ class EventSyncer {
|
||||
}
|
||||
|
||||
_subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) {
|
||||
event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
|
||||
const s = event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]);
|
||||
this.subscriptions.push(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
_parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => {
|
||||
@ -148,6 +160,12 @@ class EventSyncer {
|
||||
}
|
||||
this.events.emit(eventKey, ev);
|
||||
}
|
||||
|
||||
close(){
|
||||
this.subscriptions.forEach(x => {
|
||||
x.unsubscribe();
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export default EventSyncer;
|
||||
|
@ -27,13 +27,14 @@ export default class Subspace {
|
||||
return new Promise((resolve, reject) => {
|
||||
this._db = new Database(this.options.dbFilename, this.events, resolve);
|
||||
this.db = this._db.db;
|
||||
this.eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: get contract abi/address instead
|
||||
trackEvent(contractInstance, eventName, filterConditionsOrCb) {
|
||||
const eventSyncer = new EventSyncer(this.web3, this.events, this._db);
|
||||
return eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
|
||||
return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb);
|
||||
}
|
||||
|
||||
_initNewBlocksSubscription() {
|
||||
@ -64,7 +65,6 @@ export default class Subspace {
|
||||
|
||||
// TODO: should save value in database?
|
||||
trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) {
|
||||
|
||||
const sub = new ReplaySubject();
|
||||
|
||||
const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs)
|
||||
@ -132,9 +132,10 @@ export default class Subspace {
|
||||
return sub.pipe(distinctUntilChanged((a, b) => equal(a, b)));
|
||||
}
|
||||
|
||||
clean(){
|
||||
close(){
|
||||
clearInterval(this.intervalTracker);
|
||||
this.newBlocksSubscription.unsubscribe();
|
||||
if(this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe();
|
||||
this.eventSyncer.close();
|
||||
this.intervalTracker = null;
|
||||
this.callables = [];
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user