improved async polling

This commit is contained in:
Fabian Vogelsteller 2015-06-08 14:38:16 +02:00
parent 16252f3de5
commit b6c49d4db7
5 changed files with 87 additions and 55 deletions

View File

@ -71,7 +71,7 @@ module.exports = {
ETH_SIGNATURE_LENGTH: 4,
ETH_UNITS: ETH_UNITS,
ETH_BIGNUMBER_ROUNDING_MODE: { ROUNDING_MODE: BigNumber.ROUND_DOWN },
ETH_POLLING_TIMEOUT: 1000,
ETH_POLLING_TIMEOUT: 1000/2,
defaultBlock: 'latest',
defaultAccount: undefined
};

View File

@ -77,11 +77,33 @@ var getOptions = function (options) {
/**
Adds the callback and sets up the methods, to iterate over the results.
@method addCallback
@method getLogsAtStart
@param {Object} self
@param {funciton} callback
@param {funciton}
*/
var addCallback = function(self, callback) {
var getLogsAtStart = function(self, callback){
// call getFilterLogs for the first watch callback start
if (!utils.isString(self.options)) {
self.get(function (err, messages) {
// don't send all the responses to all the watches again... just to self one
if (err) {
callback(err);
}
messages.forEach(function (message) {
callback(null, message);
});
});
}
};
/**
Adds the callback and sets up the methods, to iterate over the results.
@method pollFilter
@param {Object} self
*/
var pollFilter = function(self) {
var onMessage = function (error, messages) {
if (error) {
@ -98,24 +120,11 @@ var addCallback = function(self, callback) {
});
};
// call getFilterLogs on start
if (!utils.isString(self.options)) {
self.get(function (err, messages) {
// don't send all the responses to all the watches again... just to self one
if (err) {
callback(err);
}
messages.forEach(function (message) {
callback(null, message);
});
});
}
RequestManager.getInstance().startPolling({
method: self.implementation.poll.call,
params: [self.filterId],
}, self.filterId, onMessage, self.stopWatching.bind(self));
};
var Filter = function (options, methods, formatter) {
@ -127,20 +136,20 @@ var Filter = function (options, methods, formatter) {
this.options = getOptions(options);
this.implementation = implementation;
this.callbacks = [];
this.addCallbacks = [];
this.pollFilters = [];
this.formatter = formatter;
this.implementation.newFilter(this.options, function(error, id){
if(error) {
self.filterError = error;
self.addCallbacks.forEach(function(callback){
self.callbacks.forEach(function(callback){
callback(error);
});
} else if(self.addCallbacks) {
} else {
self.filterId = id;
self.addCallbacks.forEach(function(callback){
addCallback(self, callback);
// get filter logs at start
self.callbacks.forEach(function(callback){
getLogsAtStart(self, callback);
});
self.addCallbacks = [];
pollFilter(self);
}
});
};
@ -148,10 +157,12 @@ var Filter = function (options, methods, formatter) {
Filter.prototype.watch = function (callback) {
this.callbacks.push(callback);
if(this.filterId)
addCallback(this, callback);
else
this.addCallbacks.push(callback);
if(this.filterId) {
getLogsAtStart(this, callback);
pollFilter(this);
}
return this;
};
Filter.prototype.stopWatching = function () {
@ -179,6 +190,8 @@ Filter.prototype.get = function (callback) {
return self.formatter ? self.formatter(log) : log;
});
}
return this;
};
module.exports = Filter;

View File

@ -43,7 +43,7 @@ var RequestManager = function (provider) {
arguments.callee._singletonInstance = this;
this.provider = provider;
this.polls = [];
this.polls = {};
this.timeout = null;
this.poll();
};
@ -156,7 +156,7 @@ RequestManager.prototype.setProvider = function (p) {
* @todo cleanup number of params
*/
RequestManager.prototype.startPolling = function (data, pollId, callback, uninstall) {
this.polls.push({data: data, id: pollId, callback: callback, uninstall: uninstall});
this.polls['poll_'+ pollId] = {data: data, id: pollId, callback: callback, uninstall: uninstall};
};
/*jshint maxparams:3 */
@ -167,24 +167,21 @@ RequestManager.prototype.startPolling = function (data, pollId, callback, uninst
* @param {Number} pollId
*/
RequestManager.prototype.stopPolling = function (pollId) {
for (var i = this.polls.length; i--;) {
var poll = this.polls[i];
if (poll.id === pollId) {
this.polls.splice(i, 1);
}
}
delete this.polls['poll_'+ pollId];
};
/**
* Should be called to reset polling mechanism of request manager
* Should be called to reset the polling mechanism of the request manager
*
* @method reset
*/
RequestManager.prototype.reset = function () {
this.polls.forEach(function (poll) {
poll.uninstall(poll.id);
});
this.polls = [];
for (var key in this.polls) {
if (this.polls.hasOwnProperty(key)) {
this.polls[key].uninstall();
}
}
this.polls = {};
if (this.timeout) {
clearTimeout(this.timeout);
@ -201,7 +198,7 @@ RequestManager.prototype.reset = function () {
RequestManager.prototype.poll = function () {
this.timeout = setTimeout(this.poll.bind(this), c.ETH_POLLING_TIMEOUT);
if (!this.polls.length) {
if (this.polls === {}) {
return;
}
@ -210,9 +207,20 @@ RequestManager.prototype.poll = function () {
return;
}
var payload = Jsonrpc.getInstance().toBatchPayload(this.polls.map(function (data) {
return data.data;
}));
var pollsData = [];
var pollsKeys = [];
for (var key in this.polls) {
if (this.polls.hasOwnProperty(key)) {
pollsData.push(this.polls[key].data);
pollsKeys.push(key);
}
}
if (pollsData.length === 0) {
return;
}
var payload = Jsonrpc.getInstance().toBatchPayload(pollsData);
var self = this;
this.provider.sendAsync(payload, function (error, results) {
@ -220,22 +228,22 @@ RequestManager.prototype.poll = function () {
if (error) {
return;
}
if (!utils.isArray(results)) {
throw errors.InvalidResponse(results);
}
results.map(function (result, index) {
var key = pollsKeys[index];
// make sure the filter is still installed after arrival of the request
if(self.polls[index]) {
result.callback = self.polls[index].callback;
if(self.polls[key]) {
result.callback = self.polls[key].callback;
return result;
} else
return false;
}).filter(function (result) {
if(!result)
return false;
return (!result) ? false : true;
}).filter(function (result) {
var valid = Jsonrpc.getInstance().isValidResponse(result);
if (!valid) {
result.callback(errors.InvalidResponse(result));

View File

@ -66,7 +66,7 @@ describe('web3.eth.contract', function () {
provider.injectValidation(function (payload) {
if (step === 0) {
step = 1;
provider.injectResult(3);
provider.injectResult('0x3');
assert.equal(payload.jsonrpc, '2.0');
assert.equal(payload.method, 'eth_newFilter');
assert.deepEqual(payload.params[0], {
@ -105,7 +105,7 @@ describe('web3.eth.contract', function () {
'0000000000000000000000000000000000000000000000000000000000000008'
}]]);
var r = payload.filter(function (p) {
return p.jsonrpc === '2.0' && p.method === 'eth_getFilterChanges' && p.params[0] === 3;
return p.jsonrpc === '2.0' && p.method === 'eth_getFilterChanges' && p.params[0] === '0x3';
});
assert.equal(r.length > 0, true);
}
@ -114,7 +114,8 @@ describe('web3.eth.contract', function () {
var contract = web3.eth.contract(desc).at(address);
var res = 0;
contract.Changed({from: address}).watch(function(err, result) {
var event = contract.Changed({from: address});
event.watch(function(err, result) {
assert.equal(result.args.from, address);
assert.equal(result.args.amount, 1);
assert.equal(result.args.t1, 1);

View File

@ -15,11 +15,21 @@ FakeHttpProvider2.prototype.injectResultList = function (list) {
FakeHttpProvider2.prototype.getResponse = function () {
var result = this.resultList[this.counter];
this.counter++;
// add fallback result value
if(!result)
result = {
result: undefined
};
if (result.type === 'batch') {
this.injectBatchResults(result.result);
} else {
this.injectResult(result.result);
}
this.counter = 0;
return this.response;
};