Dynamically add nodes on startup

This commit is contained in:
HenryNguyen5 2018-02-12 17:01:29 -05:00
parent 2c1474cb34
commit a9e6528a06
10 changed files with 213 additions and 118 deletions

View File

@ -11,7 +11,9 @@ import {
NodeOfflineAction,
NodeOnlineAction,
NodeRemovedAction,
TypeKeys
TypeKeys,
NetworkSwitchRequestedAction,
NetworkSwitchSucceededAction
} from 'actions/nodeBalancer';
export const balancerFlush = (): BalancerFlushAction => ({
@ -80,3 +82,11 @@ export const nodeCallSucceeded = (
type: TypeKeys.NODE_CALL_SUCCEEDED,
payload
});
export const networkSwitchRequested = (): NetworkSwitchRequestedAction => ({
type: TypeKeys.NETWORK_SWTICH_REQUESTED
});
export const networkSwitchSucceeded = (
payload: NetworkSwitchSucceededAction['payload']
): NetworkSwitchSucceededAction => ({ type: TypeKeys.NETWORK_SWITCH_SUCCEEDED, payload });

View File

@ -2,6 +2,7 @@ import { TypeKeys } from './constants';
import { Task } from 'redux-saga';
import { INodeStats } from 'reducers/nodeBalancer/nodes';
import { StaticNodeId } from 'types/node';
import { State as NodeBalancerState } from 'reducers/nodeBalancer';
export type AllNodeIds = StaticNodeId | string;
@ -61,6 +62,18 @@ export interface WorkerProcessingAction {
};
}
export interface NetworkSwitchRequestedAction {
type: TypeKeys.NETWORK_SWTICH_REQUESTED;
}
export interface NetworkSwitchSucceededAction {
type: TypeKeys.NETWORK_SWITCH_SUCCEEDED;
payload: {
nodeStats: NodeBalancerState['nodes'];
workers: NodeBalancerState['workers'];
};
}
export interface WorkerKilledAction {
type: TypeKeys.WORKER_KILLED;
payload: {
@ -90,7 +103,10 @@ export interface NodeCallSucceededAction {
payload: { result: string; nodeCall: NodeCall };
}
export type BalancerAction = BalancerFlushAction;
export type BalancerAction =
| BalancerFlushAction
| NetworkSwitchRequestedAction
| NetworkSwitchSucceededAction;
export type NodeAction = NodeOnlineAction | NodeOfflineAction | NodeAddedAction | NodeRemovedAction;

View File

@ -13,5 +13,8 @@ export enum TypeKeys {
NODE_CALL_REQUESTED = 'NODE_CALL_REQUESTED',
NODE_CALL_TIMEOUT = 'NODE_CALL_TIMEOUT',
NODE_CALL_SUCCEEDED = 'NODE_CALL_SUCCEEDED',
NODE_CALL_FAILED = 'NODE_CALL_FAILED'
NODE_CALL_FAILED = 'NODE_CALL_FAILED',
NETWORK_SWTICH_REQUESTED = 'NETWORK_SWTICH_REQUESTED',
NETWORK_SWITCH_SUCCEEDED = 'NETWORK_SWITCH_SUCCEEDED'
}

View File

@ -1,5 +1,10 @@
import { ChangeLanguageAction, SetLatestBlockAction, MetaAction } from 'actions/config';
import { TypeKeys } from 'actions/config/constants';
import {
NetworkSwitchRequestedAction,
TypeKeys as NodeBalancerTypeKeys,
NodeBalancerAction
} from 'actions/nodeBalancer';
export interface State {
languageSelection: string;
@ -15,6 +20,13 @@ const INITIAL_STATE: State = {
latestBlock: '???'
};
function handleNetworkSwitchRequested(state: State, _: NetworkSwitchRequestedAction) {
return {
...state,
offline: true
};
}
function changeLanguage(state: State, action: ChangeLanguageAction): State {
return {
...state,
@ -43,18 +55,22 @@ function setLatestBlock(state: State, action: SetLatestBlockAction): State {
};
}
export function meta(state: State = INITIAL_STATE, action: MetaAction): State {
export function meta(state: State = INITIAL_STATE, action: MetaAction | NodeBalancerAction): State {
switch (action.type) {
case TypeKeys.CONFIG_LANGUAGE_CHANGE:
return changeLanguage(state, action);
case TypeKeys.CONFIG_TOGGLE_OFFLINE:
return toggleOffline(state);
case TypeKeys.CONFIG_TOGGLE_AUTO_GAS_LIMIT:
return toggleAutoGasLimitEstimation(state);
case TypeKeys.CONFIG_SET_LATEST_BLOCK:
return setLatestBlock(state, action);
case NodeBalancerTypeKeys.NETWORK_SWTICH_REQUESTED:
return handleNetworkSwitchRequested(state, action);
default:
return state;
}

View File

@ -12,11 +12,16 @@ import {
NodeCallAction,
BalancerFlushAction,
BalancerAction,
NodeRemovedAction
NodeRemovedAction,
NetworkSwitchRequestedAction,
NetworkSwitchSucceededAction
} from 'actions/nodeBalancer';
import { TypeKeys } from 'actions/nodeBalancer/constants';
import { configuredStore } from 'store';
import { getNodeConfig } from 'selectors/config';
export interface INodeStats {
isCustom: boolean;
maxWorkers: number;
currWorkersById: string[];
timeoutThresholdMs: number;
@ -32,74 +37,12 @@ export interface State {
}
// hard code in the nodes for now
const INITIAL_STATE: State = {
eth_mycrypto: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
},
eth_ethscan: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
},
eth_infura: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
}
};
const INITIAL_STATE: State = {};
const handleNetworkSwitch: Reducer<State> = (
_: State,
{ payload: { nodeStats } }: NetworkSwitchSucceededAction
) => nodeStats;
const handleWorkerKilled: Reducer<State> = (
state: State,
@ -197,6 +140,9 @@ export const nodes: Reducer<State> = (
return handleNodeCallTimeout(state, action);
case TypeKeys.BALANCER_FLUSH:
return handleBalancerFlush(state, action);
case TypeKeys.NETWORK_SWITCH_SUCCEEDED:
return handleNetworkSwitch(state, action);
default:
return state;
}

View File

@ -8,12 +8,14 @@ import {
NodeCallSucceededAction,
WorkerAction,
NodeCallAction,
NodeCallTimeoutAction
NodeCallTimeoutAction,
NetworkSwitchSucceededAction,
BalancerAction
} from 'actions/nodeBalancer';
import { Reducer } from 'redux';
import { TypeKeys } from 'actions/nodeBalancer/constants';
interface IWorker {
export interface IWorker {
task: Task;
assignedNode: AllNodeIds;
currentPayload: NodeCall | null;
@ -25,6 +27,9 @@ export interface State {
const INITIAL_STATE: State = {};
const handleNetworkSwitch: Reducer<State> = (_: State, { payload }: NetworkSwitchSucceededAction) =>
payload.workers;
const handleWorkerKilled: Reducer<State> = (state: State, { payload }: WorkerKilledAction) => {
const stateCopy = { ...state };
Reflect.deleteProperty(stateCopy, payload.workerId);
@ -81,9 +86,11 @@ const handleNodeCallTimeout: Reducer<State> = (
export const workers: Reducer<State> = (
state: State = INITIAL_STATE,
action: WorkerAction | NodeCallAction
action: WorkerAction | NodeCallAction | BalancerAction
): State => {
switch (action.type) {
case TypeKeys.NETWORK_SWITCH_SUCCEEDED:
return handleNetworkSwitch(state, action);
case TypeKeys.WORKER_SPAWNED:
return handleWorkerSpawned(state, action);
case TypeKeys.WORKER_KILLED:

View File

@ -46,7 +46,7 @@ export function* pollOfflineStatus(): SagaIterator {
const shouldPing = !hasCheckedOnline || navigator.onLine === isOffline;
if (shouldPing && !document.hidden) {
const { pingSucceeded } = yield race({
pingSucceeded: call(nodeConfig.lib.ping.bind(nodeConfig.lib)),
pingSucceeded: call(nodeConfig.lib.ping),
timeout: call(delay, 5000)
});
if (pingSucceeded && isOffline) {

View File

@ -1,7 +1,6 @@
import { delay, SagaIterator, buffers, channel, Task, Channel, takeEvery } from 'redux-saga';
import {
call,
cancel,
fork,
put,
take,
@ -29,19 +28,28 @@ import {
NodeCallTimeoutAction,
NodeOfflineAction,
nodeOnline,
BalancerFlushAction
BalancerFlushAction,
balancerFlush,
networkSwitchRequested,
NetworkSwitchSucceededAction,
networkSwitchSucceeded
} from 'actions/nodeBalancer';
import {
getAvailableNodes,
AvailableNodes,
getNodeStatsById,
getAllMethodsAvailable,
getAvailableNodeId
} from 'selectors/nodeBalancer';
import { getOffline, getNodeById } from 'selectors/config';
import {
getOffline,
getNodeById,
getAllNodesOfNetworkId,
getNetworkConfig,
getSelectedNetwork
} from 'selectors/config';
import { toggleOffline } from 'actions/config';
import { StaticNodeConfig, CustomNodeConfig, NodeConfig } from '../../../shared/types/node';
import { INodeStats } from 'reducers/nodeBalancer/nodes';
import { IWorker } from 'reducers/nodeBalancer/workers';
// need to check this arbitary number
const MAX_NODE_CALL_TIMEOUTS = 3;
@ -63,36 +71,93 @@ interface IChannels {
const channels: IChannels = {};
function* initAndChannelNodePool(): SagaIterator {
console.log('Initializing channel and node pool started');
const availableNodes: AvailableNodes = yield select(getAvailableNodes);
const availableNodesArr = Object.entries(availableNodes);
function* networkSwitch(): SagaIterator {
yield put(networkSwitchRequested());
// if there are no available nodes during the initialization, put the app in an offline state
if (availableNodesArr.length === 0) {
const isOffline: boolean = yield select(getOffline);
if (!isOffline) {
yield put(toggleOffline());
}
}
//flush all existing requests
yield put(balancerFlush());
const network: string = yield select(getSelectedNetwork);
const nodes: {
[x: string]: NodeConfig;
} = yield select(getAllNodesOfNetworkId, network);
interface Workers {
[workerId: string]: IWorker;
}
/**
*
* @description Handles checking if a node is online or not, and adding it to the node balancer
* @param {string} nodeId
* @param {NodeConfig} nodeConfig
*/
function* handleAddingNode(nodeId: string, nodeConfig: NodeConfig) {
const startTime = new Date();
const nodeIsOnline: boolean = yield call(checkNodeConnectivity, nodeId, false);
const endTime = new Date();
const avgResponseTime = +endTime - +startTime;
const stats: INodeStats = {
avgResponseTime,
isOffline: !nodeIsOnline,
isCustom: nodeConfig.isCustom,
timeoutThresholdMs: 2000,
currWorkersById: [],
maxWorkers: 3,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
};
// make a channel per available node and init its workers up to the maxiumum allowed workers
for (const [nodeId, nodeConfig] of availableNodesArr) {
const nodeChannel: Channel<NodeCall> = yield call(channel, buffers.expanding(10));
channels[nodeId] = nodeChannel;
const workers: Workers = {};
for (
let workerNumber = nodeConfig.currWorkersById.length;
workerNumber < nodeConfig.maxWorkers;
let workerNumber = stats.currWorkersById.length;
workerNumber < stats.maxWorkers;
workerNumber++
) {
const workerId = `${nodeId}_worker_${workerNumber}`;
const workerTask: Task = yield spawn(spawnWorker, workerId, nodeId, nodeChannel);
console.log(`Worker ${workerId} spawned for ${nodeId}`);
yield put(workerSpawned({ nodeId, workerId, task: workerTask }));
stats.currWorkersById.push(workerId);
const worker: IWorker = { assignedNode: nodeId, currentPayload: null, task: workerTask };
workers[workerId] = worker;
}
return { nodeId, stats, workers };
}
console.log('Initializing channel and node pool finished');
const nodeEntries = Object.entries(nodes).map(([nodeId, nodeConfig]) =>
call(handleAddingNode, nodeId, nodeConfig)
);
// process adding all nodes in parallel
const processedNodes: { nodeId: string; stats: INodeStats; workers: Workers }[] = yield all(
nodeEntries
);
const networkSwitchPayload = processedNodes.reduce(
(accu, currNode) => ({
nodeStats: { ...accu.nodeStats, [currNode.nodeId]: currNode.stats },
workers: { ...accu.workers, ...currNode.workers }
}),
{} as NetworkSwitchSucceededAction['payload']
);
yield put(networkSwitchSucceeded(networkSwitchPayload));
}
function* handleNodeCallRequests(): SagaIterator {
@ -100,7 +165,9 @@ function* handleNodeCallRequests(): SagaIterator {
while (true) {
const { payload }: NodeCallRequestedAction = yield take(requestChan);
// check if the app is offline
if (yield select(getOffline)) {
yield call(delay, 2000);
}
// wait until its back online
// get an available nodeId to put the action to the channel
@ -148,7 +215,11 @@ function* handleCallTimeouts({
}
}
function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) {
/**
* @description polls the offline state of a node, then returns control to caller when it comes back online
* @param {string} nodeId
*/
function* checkNodeConnectivity(nodeId: string, poll: boolean = true) {
const nodeConfig: NodeConfig = yield select(getNodeById, nodeId);
while (true) {
try {
@ -159,6 +230,22 @@ function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) {
});
if (lb) {
console.log(`${nodeId} online!`);
return true;
}
} catch (error) {
if (!poll) {
return false;
}
yield call(delay, 5000);
console.info(error);
}
console.log(`${nodeId} still offline`);
}
}
function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) {
yield call(checkNodeConnectivity, nodeId);
yield put(nodeOnline({ nodeId }));
// check if all methods are available after this node is online
@ -171,14 +258,6 @@ function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) {
yield put(toggleOffline());
}
}
}
} catch (error) {
yield call(delay, 5000);
console.info(error);
}
console.log(`${nodeId} still offline`);
}
}
function* spawnWorker(thisId: string, nodeId: string, chan: IChannels[string]) {
@ -287,7 +366,7 @@ function* flushHandler(_: BalancerFlushAction): SagaIterator {
export function* nodeBalancer() {
yield all([
call(initAndChannelNodePool),
call(networkSwitch),
takeEvery(TypeKeys.NODE_OFFLINE, watchOfflineNode),
fork(handleNodeCallRequests),
takeEvery(TypeKeys.NODE_CALL_TIMEOUT, handleCallTimeouts),

View File

@ -9,7 +9,8 @@ import {
StaticNodeConfig,
StaticNodeId,
Web3NodeConfig,
StaticNodeWithWeb3Id
StaticNodeWithWeb3Id,
NodeConfig
} from 'types/node';
const getConfig = (state: AppState) => state.config;
@ -49,6 +50,22 @@ export const getNodeById = (state: AppState, nodeId: string) =>
? getStaticNodeFromId(state, nodeId)
: getCustomNodeFromId(state, nodeId);
const getNodesOfNetworkId = (nodes: { [key: string]: NodeConfig }, networkId: string) => {
const allNodesOfNetworkId: { [key: string]: NodeConfig } = {};
return Object.entries(nodes).reduce((allNodes, [currNodeId, currNodeConfig]) => {
if (currNodeConfig.network !== networkId) {
return allNodes;
}
return { ...allNodes, [currNodeId]: currNodeConfig };
}, allNodesOfNetworkId);
};
export const getAllNodesOfNetworkId = (state: AppState, networkId: string) => {
const staticNodesOfNetwork = getNodesOfNetworkId(getStaticNodeConfigs(state), networkId);
const customNodesOfNetwork = getNodesOfNetworkId(getCustomNodeConfigs(state), networkId);
return { ...staticNodesOfNetwork, ...customNodesOfNetwork };
};
export const isStaticNodeId = (state: AppState, nodeId: string): nodeId is StaticNodeWithWeb3Id =>
Object.keys(getStaticNodeConfigs(state)).includes(nodeId);

View File

@ -2,6 +2,7 @@ import { AppState } from 'reducers';
import { State as NodeBalancerState, INodeStats } from 'reducers/nodeBalancer/nodes';
import { Omit } from 'react-redux';
import { NodeCall } from 'actions/nodeBalancer';
import { getNodeById } from 'selectors/config';
const allMethods = [
'client',
@ -18,7 +19,7 @@ const allMethods = [
];
export const getNodeBalancer = (state: AppState) => state.nodeBalancer;
export const getNodes = (state: AppState) => getNodeBalancer(state).nodes;
export const getNodesState = (state: AppState) => getNodeBalancer(state).nodes;
export type AvailableNodes = {
[nodeId in keyof NodeBalancerState]: Omit<NodeBalancerState[nodeId], 'isOffline'> & {
@ -31,7 +32,7 @@ export type AvailableNodes = {
* @param state
*/
export const getAvailableNodes = (state: AppState): AvailableNodes => {
const nodes = getNodes(state);
const nodes = getNodesState(state);
const initialState: AvailableNodes = {};
const isAvailable = (node: NodeBalancerState[string]): node is AvailableNodes['string'] =>
@ -124,4 +125,4 @@ export const getWorkersById = (state: AppState, workerId: string) => getWorkers(
export const getNodeStatsById = (
state: AppState,
nodeId: string
): Readonly<INodeStats> | undefined => getNodes(state)[nodeId];
): Readonly<INodeStats> | undefined => getNodesState(state)[nodeId];