mirror of https://github.com/status-im/consul.git
156 lines
5.9 KiB
JavaScript
156 lines
5.9 KiB
JavaScript
import { get } from '@ember/object';
|
|
|
|
const pause = 2000;
|
|
// native EventSource retry is ~3s wait
|
|
// any specified errors here will mean that the blocking query will attempt
|
|
// a reconnection every 3s until it reconnects to Consul
|
|
export const createErrorBackoff = function(ms = 3000, P = Promise, wait = setTimeout) {
|
|
// This expects an ember-data like error
|
|
return function(err) {
|
|
// expect and ember-data error or a http-like error (e.statusCode)
|
|
let status = get(err, 'errors.firstObject.status') || get(err, 'statusCode');
|
|
if (typeof status !== 'undefined') {
|
|
// ember-data errors are strings, http errors are numbers
|
|
status = status.toString();
|
|
switch (true) {
|
|
// Any '5xx' (not 500) errors should back off and try again
|
|
case status.indexOf('5') === 0 && status.length === 3 && status !== '500':
|
|
// fallsthrough
|
|
case status === '0':
|
|
// TODO: Move this to the view layer so we can show a connection error
|
|
// and reconnection success to the user
|
|
// Any 0 aborted connections should back off and try again
|
|
return new P(function(resolve) {
|
|
wait(function() {
|
|
resolve(err);
|
|
}, ms);
|
|
});
|
|
}
|
|
}
|
|
// any other errors should throw to be picked up by an error listener/catch
|
|
throw err;
|
|
};
|
|
};
|
|
export const validateCursor = function(current, prev = null) {
|
|
let cursor = parseInt(current);
|
|
if (!isNaN(cursor)) {
|
|
// if cursor is less than the current cursor, reset to zero
|
|
if (prev !== null && cursor < prev) {
|
|
cursor = 0;
|
|
}
|
|
// if cursor is less than 0, its always safe to use 1
|
|
return Math.max(cursor, 1);
|
|
}
|
|
};
|
|
const throttle = function(configuration, prev, current) {
|
|
return function(obj) {
|
|
return new Promise(function(resolve, reject) {
|
|
setTimeout(function() {
|
|
resolve(obj);
|
|
}, configuration.interval || pause);
|
|
});
|
|
};
|
|
};
|
|
const defaultCreateEvent = function(result, configuration) {
|
|
return {
|
|
type: 'message',
|
|
data: result,
|
|
};
|
|
};
|
|
/**
|
|
* Wraps an EventSource with functionality to add native EventSource-like functionality
|
|
*
|
|
* @param {Class} [CallableEventSource] - CallableEventSource Class
|
|
* @param {Function} [backoff] - Default backoff function for all instances, defaults to createErrorBackoff
|
|
*/
|
|
export default function(EventSource, backoff = createErrorBackoff()) {
|
|
/**
|
|
* An EventSource implementation to add native EventSource-like functionality with just callbacks (`cursor` and 5xx backoff)
|
|
*
|
|
* This includes:
|
|
* 1. 5xx backoff support (uses a 3 second reconnect like native implementations). You can add to this via `Promise.catch`
|
|
* 2. A `cursor` configuration value. Current `cursor` is taken from the `meta` property of the event (i.e. `event.data.meta.cursor`)
|
|
* 3. Event data can be customized by adding a `configuration.createEvent`
|
|
*
|
|
* @param {Function} [source] - Promise returning function that resolves your data
|
|
* @param {Object} [configuration] - Plain configuration object:
|
|
* `cursor` - Cursor position of the EventSource
|
|
* `createEvent` - A data filter, giving you the opportunity to filter or replace the event data, such as removing/replacing records
|
|
*/
|
|
const BlockingEventSource = function(source, configuration = {}) {
|
|
const { currentEvent, ...config } = configuration;
|
|
EventSource.apply(this, [
|
|
configuration => {
|
|
const { createEvent, ...superConfiguration } = configuration;
|
|
return source
|
|
.apply(this, [superConfiguration, this])
|
|
.catch(backoff)
|
|
.then(result => {
|
|
if (result instanceof Error) {
|
|
return result;
|
|
}
|
|
const _createEvent =
|
|
typeof createEvent === 'function' ? createEvent : defaultCreateEvent;
|
|
let event = _createEvent(result, configuration);
|
|
// allow custom types, but make a default of `message`, ideally this would check for CustomEvent
|
|
// but keep this flexible for the moment
|
|
if (!event.type) {
|
|
event = {
|
|
type: 'message',
|
|
data: event,
|
|
};
|
|
}
|
|
// meta is also configurable by using createEvent
|
|
const meta = get(event.data || {}, 'meta');
|
|
if (meta) {
|
|
// pick off the `cursor` from the meta and add it to configuration
|
|
// along with cursor validation
|
|
configuration.cursor = validateCursor(meta.cursor, configuration.cursor);
|
|
configuration.cacheControl = meta.cacheControl;
|
|
configuration.interval = meta.interval;
|
|
}
|
|
if ((configuration.cacheControl || '').indexOf('no-store') === -1) {
|
|
this.currentEvent = event;
|
|
}
|
|
this.dispatchEvent(event);
|
|
const throttledResolve = throttle(configuration, event, this.previousEvent);
|
|
this.previousEvent = this.currentEvent;
|
|
return throttledResolve(result);
|
|
});
|
|
},
|
|
config,
|
|
]);
|
|
if (typeof currentEvent !== 'undefined') {
|
|
this.currentEvent = currentEvent;
|
|
}
|
|
// only on initialization
|
|
// if we already have an currentEvent set via configuration
|
|
// dispatch the event so things are populated immediately
|
|
this.addEventListener('open', e => {
|
|
const currentEvent = e.target.getCurrentEvent();
|
|
if (typeof currentEvent !== 'undefined') {
|
|
this.dispatchEvent(currentEvent);
|
|
}
|
|
});
|
|
};
|
|
BlockingEventSource.prototype = Object.assign(
|
|
Object.create(EventSource.prototype, {
|
|
constructor: {
|
|
value: EventSource,
|
|
configurable: true,
|
|
writable: true,
|
|
},
|
|
}),
|
|
{
|
|
// if we are having these props, at least make getters
|
|
getCurrentEvent: function() {
|
|
return this.currentEvent;
|
|
},
|
|
getPreviousEvent: function() {
|
|
return this.previousEvent;
|
|
},
|
|
}
|
|
);
|
|
return BlockingEventSource;
|
|
}
|