added .(re)subscribe to subscripton

This commit is contained in:
Fabian Vogelsteller 2017-03-03 22:55:48 +01:00
parent 7310b88b6a
commit ecbf1e7319
No known key found for this signature in database
GPG Key ID: E51EADA77F1A4124
5 changed files with 140 additions and 42 deletions

View File

@ -33,7 +33,9 @@ Returns
``EventEmitter`` - A Subscription instance
- ``subscription.id``: The subscription id, used to identify and unsubscribing the subscription.
- ``subscription.subscribe([callback])``: Can be used to re-subscribe with the same parameters.
- ``subscription.unsubscribe([callback])``: Unsubscribes the subscription and returns `TRUE` in the callback if successfull.
- ``subscription.arguments``: The subscription arguments, used when re-subscribing.
- ``on("data")`` returns ``Object``: Fires on each incoming log with the log object as argument.
- ``on("changed")`` returns ``Object``: Fires on each log which was removed from the blockchain. The log will have the additional property ``"removed: true"``.
- ``on("error")`` returns ``Object``: Fires when an error in the subscription occours.

View File

@ -7,7 +7,6 @@
"main": "src/index.js",
"dependencies": {
"web3-core-helpers": "^1.0.0",
"web3-providers-ipc": "^1.0.0",
"underscore": "^1.8.3",
"oboe": "^2.1.3",
"xmlhttprequest": "*",

View File

@ -0,0 +1,86 @@
/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @file givenProvider.js
* @author Fabian Vogelsteller <fabian@ethereum.org>
* @date 2017
*/
"use strict";
var givenProvider = null;
// ADD GIVEN PROVIDER
/* jshint ignore:start */
var global = Function('return this')();
// EthereumProvider
if(typeof global.ethereumProvider !== 'undefined') {
givenProvider = global.ethereumProvider;
// Legacy web3.currentProvider
} else if(typeof global.web3 !== 'undefined' && global.web3.currentProvider) {
if(global.web3.currentProvider.sendAsync) {
global.web3.currentProvider.send = global.web3.currentProvider.sendAsync;
delete global.web3.currentProvider.sendAsync;
}
// if connection is 'ipcProviderWrapper', add subscription support
if(!global.web3.currentProvider.on &&
global.web3.currentProvider.connection &&
global.web3.currentProvider.connection.constructor.name === 'ipcProviderWrapper') {
global.web3.currentProvider.on = function (type, callback) {
if(typeof callback !== 'function')
throw new Error('The second parameter callback must be a function.');
switch(type){
case 'notification':
this.connection.on('data', function(data) {
var result = '';
data = data.toString();
try {
result = JSON.parse(data);
} catch(e) {
return callback(new Error('Couldn\'t parse response data'+ data));
}
// notification
if(!result.id && result.method.indexOf('_subscription') !== -1) {
callback(null, result);
}
});
break;
default:
this.connection.on(type, callback);
break;
}
};
}
givenProvider = global.web3.currentProvider;
}
/* jshint ignore:end */
module.exports = givenProvider;

View File

@ -24,9 +24,10 @@
var _ = require('underscore');
var Jsonrpc = require('./jsonrpc');
var errors = require('web3-core-helpers').errors;
var BatchManager = require('./batch');
var Jsonrpc = require('./jsonrpc.js');
var BatchManager = require('./batch.js');
var givenProvider = require('./givenProvider.js');
@ -44,31 +45,8 @@ var RequestManager = function RequestManager(provider) {
};
// ADD GIVEN PROVIDER
/* jshint ignore:start */
var IpcProvider = require('web3-providers-ipc');
var global = Function('return this')();
if(typeof global.ethereumProvider !== 'undefined') {
RequestManager.givenProvider = global.ethereumProvider;
} else if(typeof global.web3 !== 'undefined' && global.web3.currentProvider) {
// if connection object is available, create new provider
if (global.web3.currentProvider.connection) {
RequestManager.givenProvider = new IpcProvider('', global.web3.currentProvider.connection);
// otherwise subscription aren't available
} else {
if(global.web3.currentProvider.sendAsync) {
global.web3.currentProvider.send = global.web3.currentProvider.sendAsync;
delete global.web3.currentProvider.sendAsync;
}
RequestManager.givenProvider = global.web3.currentProvider;
}
}
/* jshint ignore:end */
RequestManager.givenProvider = givenProvider;
/**

View File

@ -31,6 +31,7 @@ var Subscription = function Subscription(options) {
var emitter = new EventEmitter();
this.id = null;
this.callback = null;
this.arguments = null;
this._reconnectIntervalId = null;
this.options = {
@ -86,7 +87,7 @@ Subscription.prototype._validateArgs = function (args) {
if(!subscription.params)
subscription.params = 0;
if (args.length !== subscription.params + 1) {
if (args.length !== subscription.params) {
throw errors.InvalidNumberOfParams(args.length, subscription.params + 1, args[0]);
}
};
@ -106,19 +107,13 @@ Subscription.prototype._formatInput = function (args) {
return args;
}
// replace subscription with given name
if (subscription.subscriptionName) {
args[0] = subscription.subscriptionName;
}
if (!subscription.inputFormatter) {
return args;
}
var formattedArgs = subscription.inputFormatter.map(function (formatter, index) {
return formatter ? formatter(args[index+1]) : args[index+1];
return formatter ? formatter(args[index]) : args[index];
});
formattedArgs.unshift(args[0]);
return formattedArgs;
};
@ -145,9 +140,33 @@ Subscription.prototype._formatOutput = function (result) {
* @return {Object}
*/
Subscription.prototype._toPayload = function (args) {
var params = [];
this.callback = this._extractCallback(args);
var params = this._formatInput(args);
this._validateArgs(params);
if (!this.subscriptionMethod) {
this.subscriptionMethod = args.shift();
// replace subscription with given name
if (this.options.subscription.subscriptionName) {
this.subscriptionMethod = this.options.subscription.subscriptionName;
}
}
if (!this.arguments) {
this.arguments = this._formatInput(args);
this._validateArgs(this.arguments);
args = []; // make empty after validation
}
// re-add subscriptionName
params.push(this.subscriptionMethod);
params = params.concat(this.arguments);
if (args.length) {
throw new Error('Only a callback is allowed as parameter on an already instantiated subscription.');
}
return {
method: this.options.type + '_subscribe',
@ -178,12 +197,25 @@ Subscription.prototype.unsubscribe = function(callback) {
*/
Subscription.prototype.subscribe = function() {
var _this = this;
var args = arguments;
var payload = this._toPayload(Array.prototype.slice.call(arguments));
var args = Array.prototype.slice.call(arguments);
var payload = this._toPayload(args);
if(!payload) {
return this;
}
// throw error, if provider doesnt support subscriptions
if(!this.options.requestManager.provider.on)
throw new Error('The current provider doesn\'t support subscriptions', this.options.requestManager.provider);
if(!this.options.requestManager.provider.on) {
var err = new Error('The current provider doesn\'t support subscriptions'+ this.options.requestManager.provider.constructor.name);
this.callback(err, null, this);
this.emit('error', err);
return this;
}
// if id is there unsubscribe first
if (this.id) {
this.unsubscribe();
}
// store the params in the options object
this.options.params = payload.params[1];
@ -241,12 +273,13 @@ Subscription.prototype.subscribe = function() {
// re-subscribe, if connection fails
if(_this.options.requestManager.provider.once) {
_this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
_this.options.requestManager.provider.reconnect();
}, 500);
_this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this.subscribe.apply(_this, args);
_this.subscribe(_this.callback);
});
}
_this.emit('error', err);