From a9e6528a06b6e9b370176d2166d2db44f91ec880 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 Date: Mon, 12 Feb 2018 17:01:29 -0500 Subject: [PATCH] Dynamically add nodes on startup --- common/actions/nodeBalancer/actionCreators.ts | 12 +- common/actions/nodeBalancer/actionTypes.ts | 18 ++- common/actions/nodeBalancer/constants.ts | 5 +- common/reducers/config/meta/meta.ts | 18 ++- common/reducers/nodeBalancer/nodes.ts | 84 ++-------- common/reducers/nodeBalancer/workers.ts | 13 +- common/sagas/config/node.ts | 2 +- common/sagas/node/node.ts | 153 +++++++++++++----- common/selectors/config/nodes.ts | 19 ++- common/selectors/nodeBalancer/index.ts | 7 +- 10 files changed, 213 insertions(+), 118 deletions(-) diff --git a/common/actions/nodeBalancer/actionCreators.ts b/common/actions/nodeBalancer/actionCreators.ts index 0000d700..ca56dfef 100644 --- a/common/actions/nodeBalancer/actionCreators.ts +++ b/common/actions/nodeBalancer/actionCreators.ts @@ -11,7 +11,9 @@ import { NodeOfflineAction, NodeOnlineAction, NodeRemovedAction, - TypeKeys + TypeKeys, + NetworkSwitchRequestedAction, + NetworkSwitchSucceededAction } from 'actions/nodeBalancer'; export const balancerFlush = (): BalancerFlushAction => ({ @@ -80,3 +82,11 @@ export const nodeCallSucceeded = ( type: TypeKeys.NODE_CALL_SUCCEEDED, payload }); + +export const networkSwitchRequested = (): NetworkSwitchRequestedAction => ({ + type: TypeKeys.NETWORK_SWTICH_REQUESTED +}); + +export const networkSwitchSucceeded = ( + payload: NetworkSwitchSucceededAction['payload'] +): NetworkSwitchSucceededAction => ({ type: TypeKeys.NETWORK_SWITCH_SUCCEEDED, payload }); diff --git a/common/actions/nodeBalancer/actionTypes.ts b/common/actions/nodeBalancer/actionTypes.ts index 821bb793..069173ff 100644 --- a/common/actions/nodeBalancer/actionTypes.ts +++ b/common/actions/nodeBalancer/actionTypes.ts @@ -2,6 +2,7 @@ import { TypeKeys } from './constants'; import { Task } from 'redux-saga'; import { INodeStats } from 'reducers/nodeBalancer/nodes'; import { StaticNodeId } from 'types/node'; +import { State as NodeBalancerState } from 'reducers/nodeBalancer'; export type AllNodeIds = StaticNodeId | string; @@ -61,6 +62,18 @@ export interface WorkerProcessingAction { }; } +export interface NetworkSwitchRequestedAction { + type: TypeKeys.NETWORK_SWTICH_REQUESTED; +} + +export interface NetworkSwitchSucceededAction { + type: TypeKeys.NETWORK_SWITCH_SUCCEEDED; + payload: { + nodeStats: NodeBalancerState['nodes']; + workers: NodeBalancerState['workers']; + }; +} + export interface WorkerKilledAction { type: TypeKeys.WORKER_KILLED; payload: { @@ -90,7 +103,10 @@ export interface NodeCallSucceededAction { payload: { result: string; nodeCall: NodeCall }; } -export type BalancerAction = BalancerFlushAction; +export type BalancerAction = + | BalancerFlushAction + | NetworkSwitchRequestedAction + | NetworkSwitchSucceededAction; export type NodeAction = NodeOnlineAction | NodeOfflineAction | NodeAddedAction | NodeRemovedAction; diff --git a/common/actions/nodeBalancer/constants.ts b/common/actions/nodeBalancer/constants.ts index 05d6386f..e5567bd4 100644 --- a/common/actions/nodeBalancer/constants.ts +++ b/common/actions/nodeBalancer/constants.ts @@ -13,5 +13,8 @@ export enum TypeKeys { NODE_CALL_REQUESTED = 'NODE_CALL_REQUESTED', NODE_CALL_TIMEOUT = 'NODE_CALL_TIMEOUT', NODE_CALL_SUCCEEDED = 'NODE_CALL_SUCCEEDED', - NODE_CALL_FAILED = 'NODE_CALL_FAILED' + NODE_CALL_FAILED = 'NODE_CALL_FAILED', + + NETWORK_SWTICH_REQUESTED = 'NETWORK_SWTICH_REQUESTED', + NETWORK_SWITCH_SUCCEEDED = 'NETWORK_SWITCH_SUCCEEDED' } diff --git a/common/reducers/config/meta/meta.ts b/common/reducers/config/meta/meta.ts index 7cdbb23b..b53231e7 100644 --- a/common/reducers/config/meta/meta.ts +++ b/common/reducers/config/meta/meta.ts @@ -1,5 +1,10 @@ import { ChangeLanguageAction, SetLatestBlockAction, MetaAction } from 'actions/config'; import { TypeKeys } from 'actions/config/constants'; +import { + NetworkSwitchRequestedAction, + TypeKeys as NodeBalancerTypeKeys, + NodeBalancerAction +} from 'actions/nodeBalancer'; export interface State { languageSelection: string; @@ -15,6 +20,13 @@ const INITIAL_STATE: State = { latestBlock: '???' }; +function handleNetworkSwitchRequested(state: State, _: NetworkSwitchRequestedAction) { + return { + ...state, + offline: true + }; +} + function changeLanguage(state: State, action: ChangeLanguageAction): State { return { ...state, @@ -43,18 +55,22 @@ function setLatestBlock(state: State, action: SetLatestBlockAction): State { }; } -export function meta(state: State = INITIAL_STATE, action: MetaAction): State { +export function meta(state: State = INITIAL_STATE, action: MetaAction | NodeBalancerAction): State { switch (action.type) { case TypeKeys.CONFIG_LANGUAGE_CHANGE: return changeLanguage(state, action); case TypeKeys.CONFIG_TOGGLE_OFFLINE: return toggleOffline(state); + case TypeKeys.CONFIG_TOGGLE_AUTO_GAS_LIMIT: return toggleAutoGasLimitEstimation(state); case TypeKeys.CONFIG_SET_LATEST_BLOCK: return setLatestBlock(state, action); + + case NodeBalancerTypeKeys.NETWORK_SWTICH_REQUESTED: + return handleNetworkSwitchRequested(state, action); default: return state; } diff --git a/common/reducers/nodeBalancer/nodes.ts b/common/reducers/nodeBalancer/nodes.ts index b1d48053..0f1f031c 100644 --- a/common/reducers/nodeBalancer/nodes.ts +++ b/common/reducers/nodeBalancer/nodes.ts @@ -12,11 +12,16 @@ import { NodeCallAction, BalancerFlushAction, BalancerAction, - NodeRemovedAction + NodeRemovedAction, + NetworkSwitchRequestedAction, + NetworkSwitchSucceededAction } from 'actions/nodeBalancer'; import { TypeKeys } from 'actions/nodeBalancer/constants'; +import { configuredStore } from 'store'; +import { getNodeConfig } from 'selectors/config'; export interface INodeStats { + isCustom: boolean; maxWorkers: number; currWorkersById: string[]; timeoutThresholdMs: number; @@ -32,74 +37,12 @@ export interface State { } // hard code in the nodes for now -const INITIAL_STATE: State = { - eth_mycrypto: { - avgResponseTime: 1, - currWorkersById: [], - timeoutThresholdMs: 1000, - isOffline: false, - maxWorkers: 5, - requestFailures: 0, - requestFailureThreshold: 2, - supportedMethods: { - client: true, - requests: true, - ping: true, - sendCallRequest: true, - getBalance: true, - estimateGas: true, - getTokenBalance: true, - getTokenBalances: true, - getTransactionCount: true, - getCurrentBlock: true, - sendRawTx: true - } - }, - eth_ethscan: { - avgResponseTime: 1, - currWorkersById: [], - timeoutThresholdMs: 1000, - isOffline: false, - maxWorkers: 5, - requestFailures: 0, - requestFailureThreshold: 2, - supportedMethods: { - client: true, - requests: true, - ping: true, - sendCallRequest: true, - getBalance: true, - estimateGas: true, - getTokenBalance: true, - getTokenBalances: true, - getTransactionCount: true, - getCurrentBlock: true, - sendRawTx: true - } - }, - eth_infura: { - avgResponseTime: 1, - currWorkersById: [], - timeoutThresholdMs: 1000, - isOffline: false, - maxWorkers: 5, - requestFailures: 0, - requestFailureThreshold: 2, - supportedMethods: { - client: true, - requests: true, - ping: true, - sendCallRequest: true, - getBalance: true, - estimateGas: true, - getTokenBalance: true, - getTokenBalances: true, - getTransactionCount: true, - getCurrentBlock: true, - sendRawTx: true - } - } -}; +const INITIAL_STATE: State = {}; + +const handleNetworkSwitch: Reducer = ( + _: State, + { payload: { nodeStats } }: NetworkSwitchSucceededAction +) => nodeStats; const handleWorkerKilled: Reducer = ( state: State, @@ -197,6 +140,9 @@ export const nodes: Reducer = ( return handleNodeCallTimeout(state, action); case TypeKeys.BALANCER_FLUSH: return handleBalancerFlush(state, action); + + case TypeKeys.NETWORK_SWITCH_SUCCEEDED: + return handleNetworkSwitch(state, action); default: return state; } diff --git a/common/reducers/nodeBalancer/workers.ts b/common/reducers/nodeBalancer/workers.ts index 753db39e..b3ea0042 100644 --- a/common/reducers/nodeBalancer/workers.ts +++ b/common/reducers/nodeBalancer/workers.ts @@ -8,12 +8,14 @@ import { NodeCallSucceededAction, WorkerAction, NodeCallAction, - NodeCallTimeoutAction + NodeCallTimeoutAction, + NetworkSwitchSucceededAction, + BalancerAction } from 'actions/nodeBalancer'; import { Reducer } from 'redux'; import { TypeKeys } from 'actions/nodeBalancer/constants'; -interface IWorker { +export interface IWorker { task: Task; assignedNode: AllNodeIds; currentPayload: NodeCall | null; @@ -25,6 +27,9 @@ export interface State { const INITIAL_STATE: State = {}; +const handleNetworkSwitch: Reducer = (_: State, { payload }: NetworkSwitchSucceededAction) => + payload.workers; + const handleWorkerKilled: Reducer = (state: State, { payload }: WorkerKilledAction) => { const stateCopy = { ...state }; Reflect.deleteProperty(stateCopy, payload.workerId); @@ -81,9 +86,11 @@ const handleNodeCallTimeout: Reducer = ( export const workers: Reducer = ( state: State = INITIAL_STATE, - action: WorkerAction | NodeCallAction + action: WorkerAction | NodeCallAction | BalancerAction ): State => { switch (action.type) { + case TypeKeys.NETWORK_SWITCH_SUCCEEDED: + return handleNetworkSwitch(state, action); case TypeKeys.WORKER_SPAWNED: return handleWorkerSpawned(state, action); case TypeKeys.WORKER_KILLED: diff --git a/common/sagas/config/node.ts b/common/sagas/config/node.ts index a9a329bf..2565a1c8 100644 --- a/common/sagas/config/node.ts +++ b/common/sagas/config/node.ts @@ -46,7 +46,7 @@ export function* pollOfflineStatus(): SagaIterator { const shouldPing = !hasCheckedOnline || navigator.onLine === isOffline; if (shouldPing && !document.hidden) { const { pingSucceeded } = yield race({ - pingSucceeded: call(nodeConfig.lib.ping.bind(nodeConfig.lib)), + pingSucceeded: call(nodeConfig.lib.ping), timeout: call(delay, 5000) }); if (pingSucceeded && isOffline) { diff --git a/common/sagas/node/node.ts b/common/sagas/node/node.ts index a2cc1c2a..7adc353e 100644 --- a/common/sagas/node/node.ts +++ b/common/sagas/node/node.ts @@ -1,7 +1,6 @@ import { delay, SagaIterator, buffers, channel, Task, Channel, takeEvery } from 'redux-saga'; import { call, - cancel, fork, put, take, @@ -29,19 +28,28 @@ import { NodeCallTimeoutAction, NodeOfflineAction, nodeOnline, - BalancerFlushAction + BalancerFlushAction, + balancerFlush, + networkSwitchRequested, + NetworkSwitchSucceededAction, + networkSwitchSucceeded } from 'actions/nodeBalancer'; import { - getAvailableNodes, - AvailableNodes, getNodeStatsById, getAllMethodsAvailable, getAvailableNodeId } from 'selectors/nodeBalancer'; -import { getOffline, getNodeById } from 'selectors/config'; +import { + getOffline, + getNodeById, + getAllNodesOfNetworkId, + getNetworkConfig, + getSelectedNetwork +} from 'selectors/config'; import { toggleOffline } from 'actions/config'; import { StaticNodeConfig, CustomNodeConfig, NodeConfig } from '../../../shared/types/node'; import { INodeStats } from 'reducers/nodeBalancer/nodes'; +import { IWorker } from 'reducers/nodeBalancer/workers'; // need to check this arbitary number const MAX_NODE_CALL_TIMEOUTS = 3; @@ -63,36 +71,93 @@ interface IChannels { const channels: IChannels = {}; -function* initAndChannelNodePool(): SagaIterator { - console.log('Initializing channel and node pool started'); - const availableNodes: AvailableNodes = yield select(getAvailableNodes); - const availableNodesArr = Object.entries(availableNodes); +function* networkSwitch(): SagaIterator { + yield put(networkSwitchRequested()); - // if there are no available nodes during the initialization, put the app in an offline state - if (availableNodesArr.length === 0) { - const isOffline: boolean = yield select(getOffline); - if (!isOffline) { - yield put(toggleOffline()); - } + //flush all existing requests + yield put(balancerFlush()); + + const network: string = yield select(getSelectedNetwork); + const nodes: { + [x: string]: NodeConfig; + } = yield select(getAllNodesOfNetworkId, network); + + interface Workers { + [workerId: string]: IWorker; } + /** + * + * @description Handles checking if a node is online or not, and adding it to the node balancer + * @param {string} nodeId + * @param {NodeConfig} nodeConfig + */ + function* handleAddingNode(nodeId: string, nodeConfig: NodeConfig) { + const startTime = new Date(); + const nodeIsOnline: boolean = yield call(checkNodeConnectivity, nodeId, false); + const endTime = new Date(); + const avgResponseTime = +endTime - +startTime; + const stats: INodeStats = { + avgResponseTime, + isOffline: !nodeIsOnline, + isCustom: nodeConfig.isCustom, + timeoutThresholdMs: 2000, + currWorkersById: [], + maxWorkers: 3, + requestFailures: 0, + requestFailureThreshold: 2, + supportedMethods: { + client: true, + requests: true, + ping: true, + sendCallRequest: true, + getBalance: true, + estimateGas: true, + getTokenBalance: true, + getTokenBalances: true, + getTransactionCount: true, + getCurrentBlock: true, + sendRawTx: true + } + }; - // make a channel per available node and init its workers up to the maxiumum allowed workers - for (const [nodeId, nodeConfig] of availableNodesArr) { const nodeChannel: Channel = yield call(channel, buffers.expanding(10)); channels[nodeId] = nodeChannel; + const workers: Workers = {}; for ( - let workerNumber = nodeConfig.currWorkersById.length; - workerNumber < nodeConfig.maxWorkers; + let workerNumber = stats.currWorkersById.length; + workerNumber < stats.maxWorkers; workerNumber++ ) { const workerId = `${nodeId}_worker_${workerNumber}`; const workerTask: Task = yield spawn(spawnWorker, workerId, nodeId, nodeChannel); console.log(`Worker ${workerId} spawned for ${nodeId}`); - yield put(workerSpawned({ nodeId, workerId, task: workerTask })); + stats.currWorkersById.push(workerId); + const worker: IWorker = { assignedNode: nodeId, currentPayload: null, task: workerTask }; + workers[workerId] = worker; } + + return { nodeId, stats, workers }; } - console.log('Initializing channel and node pool finished'); + + const nodeEntries = Object.entries(nodes).map(([nodeId, nodeConfig]) => + call(handleAddingNode, nodeId, nodeConfig) + ); + + // process adding all nodes in parallel + const processedNodes: { nodeId: string; stats: INodeStats; workers: Workers }[] = yield all( + nodeEntries + ); + + const networkSwitchPayload = processedNodes.reduce( + (accu, currNode) => ({ + nodeStats: { ...accu.nodeStats, [currNode.nodeId]: currNode.stats }, + workers: { ...accu.workers, ...currNode.workers } + }), + {} as NetworkSwitchSucceededAction['payload'] + ); + + yield put(networkSwitchSucceeded(networkSwitchPayload)); } function* handleNodeCallRequests(): SagaIterator { @@ -100,7 +165,9 @@ function* handleNodeCallRequests(): SagaIterator { while (true) { const { payload }: NodeCallRequestedAction = yield take(requestChan); // check if the app is offline - + if (yield select(getOffline)) { + yield call(delay, 2000); + } // wait until its back online // get an available nodeId to put the action to the channel @@ -148,7 +215,11 @@ function* handleCallTimeouts({ } } -function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) { +/** + * @description polls the offline state of a node, then returns control to caller when it comes back online + * @param {string} nodeId + */ +function* checkNodeConnectivity(nodeId: string, poll: boolean = true) { const nodeConfig: NodeConfig = yield select(getNodeById, nodeId); while (true) { try { @@ -159,28 +230,36 @@ function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) { }); if (lb) { console.log(`${nodeId} online!`); - yield put(nodeOnline({ nodeId })); - - // check if all methods are available after this node is online - const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable); - - // if they are, put app in online state - if (isAllMethodsAvailable) { - const appIsOffline: boolean = yield select(getOffline); - if (appIsOffline) { - yield put(toggleOffline()); - } - } + return true; } } catch (error) { + if (!poll) { + return false; + } yield call(delay, 5000); console.info(error); } - console.log(`${nodeId} still offline`); } } +function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) { + yield call(checkNodeConnectivity, nodeId); + + yield put(nodeOnline({ nodeId })); + + // check if all methods are available after this node is online + const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable); + + // if they are, put app in online state + if (isAllMethodsAvailable) { + const appIsOffline: boolean = yield select(getOffline); + if (appIsOffline) { + yield put(toggleOffline()); + } + } +} + function* spawnWorker(thisId: string, nodeId: string, chan: IChannels[string]) { /** * @description used to differentiate between errors from worker code vs a network call error @@ -287,7 +366,7 @@ function* flushHandler(_: BalancerFlushAction): SagaIterator { export function* nodeBalancer() { yield all([ - call(initAndChannelNodePool), + call(networkSwitch), takeEvery(TypeKeys.NODE_OFFLINE, watchOfflineNode), fork(handleNodeCallRequests), takeEvery(TypeKeys.NODE_CALL_TIMEOUT, handleCallTimeouts), diff --git a/common/selectors/config/nodes.ts b/common/selectors/config/nodes.ts index 1ef91c93..e572168d 100644 --- a/common/selectors/config/nodes.ts +++ b/common/selectors/config/nodes.ts @@ -9,7 +9,8 @@ import { StaticNodeConfig, StaticNodeId, Web3NodeConfig, - StaticNodeWithWeb3Id + StaticNodeWithWeb3Id, + NodeConfig } from 'types/node'; const getConfig = (state: AppState) => state.config; @@ -49,6 +50,22 @@ export const getNodeById = (state: AppState, nodeId: string) => ? getStaticNodeFromId(state, nodeId) : getCustomNodeFromId(state, nodeId); +const getNodesOfNetworkId = (nodes: { [key: string]: NodeConfig }, networkId: string) => { + const allNodesOfNetworkId: { [key: string]: NodeConfig } = {}; + return Object.entries(nodes).reduce((allNodes, [currNodeId, currNodeConfig]) => { + if (currNodeConfig.network !== networkId) { + return allNodes; + } + return { ...allNodes, [currNodeId]: currNodeConfig }; + }, allNodesOfNetworkId); +}; + +export const getAllNodesOfNetworkId = (state: AppState, networkId: string) => { + const staticNodesOfNetwork = getNodesOfNetworkId(getStaticNodeConfigs(state), networkId); + const customNodesOfNetwork = getNodesOfNetworkId(getCustomNodeConfigs(state), networkId); + return { ...staticNodesOfNetwork, ...customNodesOfNetwork }; +}; + export const isStaticNodeId = (state: AppState, nodeId: string): nodeId is StaticNodeWithWeb3Id => Object.keys(getStaticNodeConfigs(state)).includes(nodeId); diff --git a/common/selectors/nodeBalancer/index.ts b/common/selectors/nodeBalancer/index.ts index 36622e96..5862d43a 100644 --- a/common/selectors/nodeBalancer/index.ts +++ b/common/selectors/nodeBalancer/index.ts @@ -2,6 +2,7 @@ import { AppState } from 'reducers'; import { State as NodeBalancerState, INodeStats } from 'reducers/nodeBalancer/nodes'; import { Omit } from 'react-redux'; import { NodeCall } from 'actions/nodeBalancer'; +import { getNodeById } from 'selectors/config'; const allMethods = [ 'client', @@ -18,7 +19,7 @@ const allMethods = [ ]; export const getNodeBalancer = (state: AppState) => state.nodeBalancer; -export const getNodes = (state: AppState) => getNodeBalancer(state).nodes; +export const getNodesState = (state: AppState) => getNodeBalancer(state).nodes; export type AvailableNodes = { [nodeId in keyof NodeBalancerState]: Omit & { @@ -31,7 +32,7 @@ export type AvailableNodes = { * @param state */ export const getAvailableNodes = (state: AppState): AvailableNodes => { - const nodes = getNodes(state); + const nodes = getNodesState(state); const initialState: AvailableNodes = {}; const isAvailable = (node: NodeBalancerState[string]): node is AvailableNodes['string'] => @@ -124,4 +125,4 @@ export const getWorkersById = (state: AppState, workerId: string) => getWorkers( export const getNodeStatsById = ( state: AppState, nodeId: string -): Readonly | undefined => getNodes(state)[nodeId]; +): Readonly | undefined => getNodesState(state)[nodeId];