diff --git a/src/eventSyncer.js b/src/eventSyncer.js index 38672b1..1aa9723 100644 --- a/src/eventSyncer.js +++ b/src/eventSyncer.js @@ -6,6 +6,8 @@ class EventSyncer { this.events = events; this.web3 = web3; this.db = db; + + this.subscriptions = []; } track(contractInstance, eventName, filterConditionsOrCb){ @@ -46,13 +48,22 @@ class EventSyncer { this.events.emit("updateDB"); }); - this._retrieveEvents(eventKey, - eventSummary.firstKnownBlock, - eventSummary.lastKnownBlock, - filterConditions, - filterConditionsCb, - contractInstance, - eventName); + const eth_subscribe = this._retrieveEvents(eventKey, + eventSummary.firstKnownBlock, + eventSummary.lastKnownBlock, + filterConditions, + filterConditionsCb, + contractInstance, + eventName); + + const og_subscribe = sub.subscribe; + sub.subscribe = (next, error, complete) => { + const s = og_subscribe.apply(sub, [next, error, complete]); + s.add(() => { // Removing web3js subscription when rxJS unsubscribe is executed + if(eth_subscribe) eth_subscribe.unsubscribe(); + }); + return s; + } return sub; } @@ -60,7 +71,6 @@ class EventSyncer { _retrieveEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb, contractInstance, eventName) { // TODO: this should be moved to a 'smart' module // it should be able to do events X at the time to avoid slow downs as well as the 10k limit - // TODO: filter subscriptions with fromBlock and toBlock if (firstKnownBlock == 0 || (firstKnownBlock > 0 && firstKnownBlock <= filterConditions.fromBlock)) { if (filterConditions.toBlock === 'latest') { @@ -68,7 +78,7 @@ class EventSyncer { this._serveDBEvents(eventKey, filterConditions.fromBlock, lastKnownBlock, filterConditions, filterConditionsCb); // create a event subscription [lastKnownBlock + 1, ...] let filters = Object.assign({}, filterConditions, { fromBlock: filterConditions.fromBlock > lastKnownBlock ? filterConditions.fromBlock : lastKnownBlock + 1 }); - this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey); + return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey); } else if (filterConditions.toBlock <= lastKnownBlock) { // emit DB Events [fromBlock, toBlock] @@ -92,7 +102,7 @@ class EventSyncer { this._serveDBEvents(eventKey, firstKnownBlock, lastKnownBlock, filterConditions, filterConditionsCb); // create a subscription [lastKnownBlock + 1, ...] const filters = Object.assign({}, filterConditions, { fromBlock: lastKnownBlock + 1 }); - this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey); + return this._subscribeToEvent(contractInstance.events[eventName], filters, filterConditionsCb, eventKey); } else if (filterConditions.toBlock <= lastKnownBlock) { // emit DB Events [fromBlock, toBlock] @@ -126,7 +136,9 @@ class EventSyncer { } _subscribeToEvent(event, filterConditions, filterConditionsCb, eventKey) { - event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]); + const s = event.apply(event, [filterConditions, this._parseEventCBFactory(filterConditions, filterConditionsCb, eventKey) ]); + this.subscriptions.push(s); + return s; } _parseEventCBFactory = (filterConditions, filterConditionsCb, eventKey) => (err, ev) => { @@ -148,6 +160,12 @@ class EventSyncer { } this.events.emit(eventKey, ev); } + + close(){ + this.subscriptions.forEach(x => { + x.unsubscribe(); + }) + } } export default EventSyncer; diff --git a/src/subspace.js b/src/subspace.js index 21a2d12..be5c9d3 100644 --- a/src/subspace.js +++ b/src/subspace.js @@ -27,13 +27,14 @@ export default class Subspace { return new Promise((resolve, reject) => { this._db = new Database(this.options.dbFilename, this.events, resolve); this.db = this._db.db; + this.eventSyncer = new EventSyncer(this.web3, this.events, this._db); + }) } // TODO: get contract abi/address instead trackEvent(contractInstance, eventName, filterConditionsOrCb) { - const eventSyncer = new EventSyncer(this.web3, this.events, this._db); - return eventSyncer.track(contractInstance, eventName, filterConditionsOrCb); + return this.eventSyncer.track(contractInstance, eventName, filterConditionsOrCb); } _initNewBlocksSubscription() { @@ -64,7 +65,6 @@ export default class Subspace { // TODO: should save value in database? trackProperty(contractInstance, propName, methodArgs = [], callArgs = {}) { - const sub = new ReplaySubject(); const method = contractInstance.methods[propName].apply(contractInstance.methods[propName], methodArgs) @@ -132,9 +132,10 @@ export default class Subspace { return sub.pipe(distinctUntilChanged((a, b) => equal(a, b))); } - clean(){ + close(){ clearInterval(this.intervalTracker); - this.newBlocksSubscription.unsubscribe(); + if(this.newBlocksSubscription) this.newBlocksSubscription.unsubscribe(); + this.eventSyncer.close(); this.intervalTracker = null; this.callables = []; }