fixed subscription

This commit is contained in:
Fabian Vogelsteller 2017-01-26 10:35:40 +01:00
parent aad9ae5a12
commit d101973a8f
No known key found for this signature in database
GPG Key ID: E51EADA77F1A4124

View File

@ -171,8 +171,8 @@ Subscription.prototype.unsubscribe = function(callback) {
* Subscribes and watches for changes
*
* @method subscribe
* @param {String} the subscription
* @param {Object} the options object with address topics and fromBlock
* @param {String} subscription the subscription
* @param {Object} options the options object with address topics and fromBlock
* @return {Object}
*/
Subscription.prototype.subscribe = function() {
@ -208,61 +208,60 @@ Subscription.prototype.subscribe = function() {
}
// create subscription
if (_this.callback) {
if(typeof payload.params[1] === 'object')
delete payload.params[1].fromBlock;
if(typeof payload.params[1] === 'object')
delete payload.params[1].fromBlock;
this.options.requestManager.send(payload, function (err, result) {
if(!err && result) {
_this.id = result;
this.options.requestManager.send(payload, function (err, result) {
if(!err && result) {
_this.id = result;
// call callback on notifications
_this.options.requestManager.addSubscription(_this.id, payload.params[0] ,'eth', function(err, result) {
// call callback on notifications
_this.options.requestManager.addSubscription(_this.id, payload.params[0] ,'eth', function(err, result) {
// TODO remove once its fixed in geth
if(_.isArray(result))
result = result[0];
// TODO remove once its fixed in geth
if(_.isArray(result))
result = result[0];
var output = _this._formatOutput(result);
var output = _this._formatOutput(result);
if (!err) {
// TODO remove eventEmitter??
if(output.removed)
_this.emit('changed', output);
else
_this.emit('data', output);
} else {
// unsubscribe, but keep listeners
_this.options.requestManager.removeSubscription(_this.id);
if (!err) {
// TODO remove eventEmitter??
if(output.removed)
_this.emit('changed', output);
else
_this.emit('data', output);
} else {
// unsubscribe, but keep listeners
_this.options.requestManager.removeSubscription(_this.id);
// re-subscribe, if connection fails
if(_this.options.requestManager.provider.once) {
_this._reconnectIntervalId = setInterval(function () {
_this.options.requestManager.provider.reconnect();
}, 500);
// re-subscribe, if connection fails
if(_this.options.requestManager.provider.once) {
_this._reconnectIntervalId = setInterval(function () {
_this.options.requestManager.provider.reconnect();
}, 500);
_this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this.subscribe.apply(_this, args);
});
}
_this.emit('error', err);
_this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this.subscribe.apply(_this, args);
});
}
_this.emit('error', err);
}
// call the callback, last so that unsubscribe there won't affect the emit above
// call the callback, last so that unsubscribe there won't affect the emit above
if (_.isFunction(_this.callback)) {
_this.callback(err, output, _this);
});
} else {
_this.callback(err, null, _this);
}
});
}
});
} else if (_.isFunction(_this.callback)) {
_this.callback(err, null, _this);
_this.emit('error', err);
}
});
// return an object to cancel the subscription
return this;
} else
throw new Error('Subscriptions require a callback as the last parameter!');
// return an object to cancel the subscription
return this;
};
module.exports = Subscription;