diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index ac4e2f683..7ce93c8cd 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -161,12 +161,34 @@ proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime, if checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch): return true - info "Slot start", - slot = shortLog(wallSlot.slot), - attestationIn = vc.getDurationToNextAttestation(wallSlot.slot), - blockIn = vc.getDurationToNextBlock(wallSlot.slot), - validators = vc.attachedValidators[].count(), - delay = shortLog(delay) + if len(vc.beaconNodes) > 1: + let + counts = vc.getNodeCounts() + # Good nodes are nodes which can be used for ALL the requests. + goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)] + # Viable nodes are nodes which can be used only SOME of the requests. + viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] + + counts.data[int(RestBeaconNodeStatus.NotSynced)] + + counts.data[int(RestBeaconNodeStatus.Compatible)] + # Bad nodes are nodes which can't be used at all. + badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] + + counts.data[int(RestBeaconNodeStatus.Online)] + + counts.data[int(RestBeaconNodeStatus.Incompatible)] + info "Slot start", + slot = shortLog(wallSlot.slot), + attestationIn = vc.getDurationToNextAttestation(wallSlot.slot), + blockIn = vc.getDurationToNextBlock(wallSlot.slot), + validators = vc.attachedValidators[].count(), + good_nodes = goodNodes, viable_nodes = viableNodes, bad_nodes = badNodes, + delay = shortLog(delay) + else: + info "Slot start", + slot = shortLog(wallSlot.slot), + attestationIn = vc.getDurationToNextAttestation(wallSlot.slot), + blockIn = vc.getDurationToNextBlock(wallSlot.slot), + validators = vc.attachedValidators[].count(), + node_status = $vc.beaconNodes[0].status, + delay = shortLog(delay) return false diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 6d8b00ea5..2042ae159 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -36,6 +36,14 @@ type status*: ApiOperation data*: seq[ApiNodeResponse[T]] +const + ViableNodeStatus = {RestBeaconNodeStatus.Compatible, + RestBeaconNodeStatus.NotSynced, + RestBeaconNodeStatus.OptSynced, + RestBeaconNodeStatus.Synced} + NotSyncedStatus = {RestBeaconNodeStatus.NotSynced, + RestBeaconNodeStatus.OptSynced} + proc `$`*(strategy: ApiStrategyKind): string = case strategy of ApiStrategyKind.First: @@ -81,10 +89,13 @@ template firstSuccessParallel*( vc: ValidatorClientRef, responseType: typedesc, timeout: Duration, + statuses: set[RestBeaconNodeStatus], roles: set[BeaconNodeRole], body1, body2: untyped ): ApiResponse[responseType] = - var it {.inject.}: RestClientRef + var + it {.inject.}: RestClientRef + iterations = 0 var timerFut = if timeout != InfiniteDuration: @@ -97,9 +108,14 @@ template firstSuccessParallel*( var resultReady = false let onlineNodes = try: - if not isNil(timerFut): - await vc.waitOnlineNodes(timerFut, roles) - vc.onlineNodes(roles) + if iterations == 0: + # We are not going to wait for BNs if there some available. + await vc.waitNodes(timerFut, statuses, roles, false) + else: + # We get here only, if all the requests are failed. To avoid requests + # spam we going to wait for changes in BNs statuses. + await vc.waitNodes(timerFut, statuses, roles, true) + vc.filterNodes(statuses, roles) except CancelledError as exc: if not(isNil(timerFut)) and not(timerFut.finished()): await timerFut.cancelAndWait() @@ -166,7 +182,7 @@ template firstSuccessParallel*( else: ApiResponse[responseType].ok( Future[responseType](requestFut).read()) - status = + handlerStatus = try: body2 except CancelledError as exc: @@ -174,8 +190,7 @@ template firstSuccessParallel*( except CatchableError: raiseAssert("Response handler must not raise exceptions") - node.status = status - if apiResponse.isOk() and (status == RestBeaconNodeStatus.Online): + if apiResponse.isOk() and handlerStatus: retRes = apiResponse resultReady = true asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut) @@ -215,11 +230,18 @@ template firstSuccessParallel*( break if resultReady: break + + inc(iterations) retRes -template bestSuccess*(vc: ValidatorClientRef, responseType: typedesc, - timeout: Duration, bodyRequest, - bodyScore: untyped): ApiResponse[responseType] = +template bestSuccess*( + vc: ValidatorClientRef, + responseType: typedesc, + timeout: Duration, + statuses: set[RestBeaconNodeStatus], + roles: set[BeaconNodeRole], + bodyRequest, + bodyScore: untyped): ApiResponse[responseType] = var it {.inject.}: RestClientRef type BodyType = typeof(bodyRequest) @@ -231,9 +253,8 @@ template bestSuccess*(vc: ValidatorClientRef, responseType: typedesc, let onlineNodes = try: - if not isNil(timerFut): - await vc.waitOnlineNodes(timerFut) - vc.onlineNodes() + await vc.waitNodes(timerFut, statuses, roles, false) + vc.filterNodes(statuses, roles) except CancelledError as exc: if not(isNil(timerFut)) and not(timerFut.finished()): await timerFut.cancelAndWait() @@ -339,9 +360,14 @@ template bestSuccess*(vc: ValidatorClientRef, responseType: typedesc, else: ApiResponse[responseType].err("Unable to get best response") -template onceToAll*(vc: ValidatorClientRef, responseType: typedesc, - timeout: Duration, roles: set[BeaconNodeRole], - body: untyped): ApiResponseSeq[responseType] = +template onceToAll*( + vc: ValidatorClientRef, + responseType: typedesc, + timeout: Duration, + statuses: set[RestBeaconNodeStatus], + roles: set[BeaconNodeRole], + body: untyped + ): ApiResponseSeq[responseType] = var it {.inject.}: RestClientRef type BodyType = typeof(body) @@ -353,9 +379,8 @@ template onceToAll*(vc: ValidatorClientRef, responseType: typedesc, let onlineNodes = try: - if not isNil(timerFut): - await vc.waitOnlineNodes(timerFut, roles) - vc.onlineNodes(roles) + await vc.waitNodes(timerFut, statuses, roles, false) + vc.filterNodes(statuses, roles) except CancelledError as exc: if not(isNil(timerFut)) and not(timerFut.finished()): await timerFut.cancelAndWait() @@ -459,12 +484,19 @@ template onceToAll*(vc: ValidatorClientRef, responseType: typedesc, ApiResponseSeq[responseType](status: status, data: responses) -template firstSuccessSequential*(vc: ValidatorClientRef, respType: typedesc, - timeout: Duration, - roles: set[BeaconNodeRole], body: untyped, - handlers: untyped): untyped = +template firstSuccessSequential*( + vc: ValidatorClientRef, + respType: typedesc, + timeout: Duration, + statuses: set[RestBeaconNodeStatus], + roles: set[BeaconNodeRole], + body: untyped, + handlers: untyped + ): untyped = doAssert(timeout != ZeroDuration) - var it {.inject.}: RestClientRef + var + it {.inject.}: RestClientRef + iterations = 0 var timerFut = if timeout != InfiniteDuration: @@ -472,15 +504,19 @@ template firstSuccessSequential*(vc: ValidatorClientRef, respType: typedesc, else: nil - var iterationsCount = 0 - while true: let onlineNodes = try: - await vc.waitOnlineNodes(timerFut, roles) - vc.onlineNodes(roles) + if iterations == 0: + # We are not going to wait for BNs if there some available. + await vc.waitNodes(timerFut, statuses, roles, false) + else: + # We get here only, if all the requests are failed. To avoid requests + # spam we going to wait for changes in BNs statuses. + await vc.waitNodes(timerFut, statuses, roles, true) + vc.filterNodes(statuses, roles) except CancelledError as exc: - # waitOnlineNodes do not cancel `timoutFuture`. + # waitNodes do not cancel `timoutFuture`. if not(isNil(timerFut)) and not(timerFut.finished()): await timerFut.cancelAndWait() raise exc @@ -493,8 +529,8 @@ template firstSuccessSequential*(vc: ValidatorClientRef, respType: typedesc, # `onlineNodes` sequence is empty only if operation timeout exceeded. break - if iterationsCount != 0: - debug "Request got failed", iterations_count = iterationsCount + if iterations != 0: + debug "Request got failed", iterations_count = iterations var exitNow = false @@ -539,6 +575,7 @@ template firstSuccessSequential*(vc: ValidatorClientRef, respType: typedesc, # This case should not happen. ApiOperation.Failure + var handlerStatus = false block: let apiResponse {.inject.} = block: @@ -559,16 +596,14 @@ template firstSuccessSequential*(vc: ValidatorClientRef, respType: typedesc, # finished, and `Failure` processed when Future is finished. ApiResponse[respType].err("Unexpected error") - let status = + handlerStatus = try: handlers except CatchableError: raiseAssert("Response handler must not raise exceptions") - node.status = status - if resOp == ApiOperation.Success: - if node.status == RestBeaconNodeStatus.Online: + if handlerStatus: exitNow = true break else: @@ -610,48 +645,67 @@ proc getProposerDuties*( strategy = $strategy const ErrorMessage = "Unable to retrieve proposer duties" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel(RestResponse[GetProposerDutiesResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getProposerDuties(it, epoch)): if apiResponse.isErr(): trace ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: vc.firstSuccessSequential(RestResponse[GetProposerDutiesResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getProposerDuties(it, epoch)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -661,21 +715,30 @@ proc getProposerDuties*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getAttesterDuties*( vc: ValidatorClientRef, @@ -688,49 +751,67 @@ proc getAttesterDuties*( strategy = $strategy const ErrorMessage = "Unable to retrieve attester duties" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel(RestResponse[GetAttesterDutiesResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getAttesterDuties(it, epoch, validators)): if apiResponse.isErr(): trace ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: vc.firstSuccessSequential(RestResponse[GetAttesterDutiesResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getAttesterDuties(it, epoch, validators)): if apiResponse.isErr(): - debug ErrorMessage, endpoint = node, - error = apiResponse.error() - RestBeaconNodeStatus.Offline + debug ErrorMessage, endpoint = node, error = apiResponse.error() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -740,20 +821,30 @@ proc getAttesterDuties*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline - raise newException(ValidatorApiError, ErrorMessage) + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getSyncCommitteeDuties*( vc: ValidatorClientRef, @@ -766,48 +857,68 @@ proc getSyncCommitteeDuties*( strategy = $strategy const ErrorMessage = "Unable to retrieve sync committee duties" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( - RestResponse[GetSyncCommitteeDutiesResponse], SlotDuration, - {BeaconNodeRole.Duties}, getSyncCommitteeDuties(it, epoch, validators)): + RestResponse[GetSyncCommitteeDutiesResponse], + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + getSyncCommitteeDuties(it, epoch, validators)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: vc.firstSuccessSequential(RestResponse[GetSyncCommitteeDutiesResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getSyncCommitteeDuties(it, epoch, validators)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -817,21 +928,30 @@ proc getSyncCommitteeDuties*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getForkSchedule*( vc: ValidatorClientRef, @@ -842,40 +962,54 @@ proc getForkSchedule*( strategy = $strategy const ErrorMessage = "Unable to retrieve fork schedule" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel(RestResponse[GetForkScheduleResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getForkSchedule(it)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data.data of ApiStrategyKind.Priority: vc.firstSuccessSequential(RestResponse[GetForkScheduleResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getForkSchedule(it)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -885,12 +1019,17 @@ proc getForkSchedule*( of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline - raise newException(ValidatorApiError, ErrorMessage) + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getHeadBlockRoot*( vc: ValidatorClientRef, @@ -903,49 +1042,66 @@ proc getHeadBlockRoot*( let blockIdent = BlockIdent.init(BlockIdentType.Head) const ErrorMessage = "Unable to retrieve head block's root" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel(RestResponse[GetBlockRootResponse], SlotDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteeData}, getBlockRoot(it, blockIdent)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 404: debug ResponseNotFoundError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.NotFound)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestResponse[GetBlockRootResponse], SlotDuration, + vc.firstSuccessSequential(RestResponse[GetBlockRootResponse], + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteeData}, getBlockRoot(it, blockIdent)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -955,20 +1111,29 @@ proc getHeadBlockRoot*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 404: debug ResponseNotFoundError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.NotFound)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline - raise newException(ValidatorApiError, ErrorMessage) + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getValidators*( vc: ValidatorClientRef, @@ -982,48 +1147,66 @@ proc getValidators*( let stateIdent = StateIdent.init(StateIdentType.Head) const ErrorMessage = "Unable to retrieve head state's validator information" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel(RestResponse[GetStateValidatorsResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getStateValidators(it, stateIdent, id)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 404: debug ResponseNotFoundError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.NotFound)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data.data of ApiStrategyKind.Priority: vc.firstSuccessSequential(RestResponse[GetStateValidatorsResponse], - SlotDuration, {BeaconNodeRole.Duties}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, getStateValidators(it, stateIdent, id)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -1033,21 +1216,29 @@ proc getValidators*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 404: debug ResponseNotFoundError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.NotFound)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc produceAttestationData*( vc: ValidatorClientRef, @@ -1060,51 +1251,70 @@ proc produceAttestationData*( strategy = $strategy const ErrorMessage = "Unable to retrieve attestation data" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( RestResponse[ProduceAttestationDataResponse], - OneThirdDuration, {BeaconNodeRole.AttestationData}, + OneThirdDuration, + ViableNodeStatus, + {BeaconNodeRole.AttestationData}, produceAttestationData(it, slot, committee_index)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data.data of ApiStrategyKind.Priority: vc.firstSuccessSequential( RestResponse[ProduceAttestationDataResponse], - OneThirdDuration, {BeaconNodeRole.AttestationData}, + OneThirdDuration, + ViableNodeStatus, + {BeaconNodeRole.AttestationData}, produceAttestationData(it, slot, committee_index)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -1114,21 +1324,30 @@ proc produceAttestationData*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc submitPoolAttestations*( vc: ValidatorClientRef, @@ -1142,44 +1361,63 @@ proc submitPoolAttestations*( const ErrorMessage = "Unable to submit attestation" NoErrorMessage = "Attestation was sucessfully published" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: - let res = vc.firstSuccessParallel(RestPlainResponse, SlotDuration, + let res = vc.firstSuccessParallel(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.AttestationPublish}, submitPoolAttestations(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace NoErrorMessage, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Incompatible + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, SlotDuration, + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.AttestationPublish}, submitPoolAttestations(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -1188,18 +1426,27 @@ proc submitPoolAttestations*( return true of 400: debug ResponseInvalidError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Incompatible + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc submitPoolSyncCommitteeSignature*( vc: ValidatorClientRef, @@ -1220,44 +1467,65 @@ proc submitPoolSyncCommitteeSignature*( const ErrorMessage = "Unable to submit sync committee message" NoErrorMessage = "Sync committee message was successfully published" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( - RestPlainResponse, SlotDuration, {BeaconNodeRole.SyncCommitteePublish}, + RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.SyncCommitteePublish}, submitPoolSyncCommitteeSignatures(it, @[restData])): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status of 200: trace NoErrorMessage, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Incompatible + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: vc.firstSuccessSequential( - RestPlainResponse, SlotDuration, {BeaconNodeRole.SyncCommitteePublish}, + RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.SyncCommitteePublish}, submitPoolSyncCommitteeSignatures(it, @[restData])): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status @@ -1266,18 +1534,27 @@ proc submitPoolSyncCommitteeSignature*( return true of 400: debug ResponseInvalidError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Incompatible + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, response_error = response.getIndexedErrorMessage() - RestBeaconNodeStatus.Offline + endpoint = node, + response_error = response.getIndexedErrorMessage() + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc getAggregatedAttestation*( vc: ValidatorClientRef, @@ -1290,46 +1567,62 @@ proc getAggregatedAttestation*( strategy = $strategy const ErrorMessage = "Unable to retrieve aggregated attestation data" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( RestResponse[GetAggregatedAttestationResponse], - OneThirdDuration, {BeaconNodeRole.AggregatedData}, + OneThirdDuration, + ViableNodeStatus, + {BeaconNodeRole.AggregatedData}, getAggregatedAttestation(it, root, slot)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data.data of ApiStrategyKind.Priority: vc.firstSuccessSequential( RestResponse[GetAggregatedAttestationResponse], - OneThirdDuration, {BeaconNodeRole.AggregatedData}, + OneThirdDuration, + ViableNodeStatus, + {BeaconNodeRole.AggregatedData}, getAggregatedAttestation(it, root, slot)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1339,17 +1632,23 @@ proc getAggregatedAttestation*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc produceSyncCommitteeContribution*( vc: ValidatorClientRef, @@ -1363,46 +1662,62 @@ proc produceSyncCommitteeContribution*( strategy = $strategy const ErrorMessage = "Unable to retrieve sync committee contribution data" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( - RestResponse[ProduceSyncCommitteeContributionResponse], OneThirdDuration, + RestResponse[ProduceSyncCommitteeContributionResponse], + OneThirdDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteeData}, produceSyncCommitteeContribution(it, slot, subcommitteeIndex, root)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data.data of ApiStrategyKind.Priority: vc.firstSuccessSequential( - RestResponse[ProduceSyncCommitteeContributionResponse], OneThirdDuration, + RestResponse[ProduceSyncCommitteeContributionResponse], + OneThirdDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteeData}, produceSyncCommitteeContribution(it, slot, subcommitteeIndex, root)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1412,17 +1727,23 @@ proc produceSyncCommitteeContribution*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc publishAggregateAndProofs*( vc: ValidatorClientRef, @@ -1436,47 +1757,63 @@ proc publishAggregateAndProofs*( const ErrorMessage = "Unable to publish aggregate and proofs" NoErrorMessage = "Aggregate and proofs was sucessfully published" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: - let res = vc.firstSuccessParallel(RestPlainResponse, SlotDuration, + let res = vc.firstSuccessParallel(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.AggregatedPublish}, publishAggregateAndProofs(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace NoErrorMessage, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, SlotDuration, + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.AggregatedPublish}, publishAggregateAndProofs(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1487,19 +1824,25 @@ proc publishAggregateAndProofs*( debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc publishContributionAndProofs*( vc: ValidatorClientRef, @@ -1513,48 +1856,63 @@ proc publishContributionAndProofs*( const ErrorMessage = "Unable to publish contribution and proofs" NoErrorMessage = "Contribution and proofs were successfully published" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: - let res = vc.firstSuccessParallel(RestPlainResponse, SlotDuration, + let res = vc.firstSuccessParallel(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteePublish}, publishContributionAndProofs(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace NoErrorMessage, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, SlotDuration, + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteePublish}, publishContributionAndProofs(it, data)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1565,18 +1923,25 @@ proc publishContributionAndProofs*( debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - raise newException(ValidatorApiError, ErrorMessage) + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc produceBlockV2*( vc: ValidatorClientRef, @@ -1590,50 +1955,69 @@ proc produceBlockV2*( strategy = $strategy const ErrorMessage = "Unable to retrieve block data" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( RestResponse[ProduceBlockResponseV2], - SlotDuration, {BeaconNodeRole.BlockProposalData}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.BlockProposalData}, produceBlockV2(it, slot, randao_reveal, graffiti)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: vc.firstSuccessSequential( RestResponse[ProduceBlockResponseV2], - SlotDuration, {BeaconNodeRole.BlockProposalData}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.BlockProposalData}, produceBlockV2(it, slot, randao_reveal, graffiti)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1643,21 +2027,30 @@ proc produceBlockV2*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc publishBlock*( vc: ValidatorClientRef, @@ -1672,11 +2065,14 @@ proc publishBlock*( BlockPublished = "Block was successfully published" BlockBroadcasted = "Block not passed validation, but still published" ErrorMessage = "Unable to publish block" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = block: - vc.firstSuccessParallel(RestPlainResponse, SlotDuration, + vc.firstSuccessParallel(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}): case data.kind of ConsensusFork.Phase0: @@ -1688,7 +2084,8 @@ proc publishBlock*( of ConsensusFork.Capella: publishBlock(it, data.capellaData) of ConsensusFork.EIP4844: - debugRaiseAssert $eip4844ImplementationMissing & ": validator_client/api.nim:publishBlock (1)" + debugRaiseAssert $eip4844ImplementationMissing & + ": validator_client/api.nim:publishBlock (1)" let f = newFuture[RestPlainResponse]("") f.fail(new RestError) f @@ -1696,42 +2093,56 @@ proc publishBlock*( do: if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace BlockPublished, endpoint = node - RestBeaconNodeStatus.Online + true of 202: debug BlockBroadcasted, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, SlotDuration, + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}): case data.kind of ConsensusFork.Phase0: @@ -1743,14 +2154,17 @@ proc publishBlock*( of ConsensusFork.Capella: publishBlock(it, data.capellaData) of ConsensusFork.EIP4844: - debugRaiseAssert $eip4844ImplementationMissing & ": validator_client/api.nim:publishBlock (2)" + debugRaiseAssert $eip4844ImplementationMissing & + ": validator_client/api.nim:publishBlock (2)" let f = newFuture[RestPlainResponse]("") f.fail(new RestError) f do: if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1764,24 +2178,33 @@ proc publishBlock*( debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc produceBlindedBlock*( vc: ValidatorClientRef, @@ -1795,50 +2218,69 @@ proc produceBlindedBlock*( strategy = $strategy const ErrorMessage = "Unable to retrieve block data" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = vc.firstSuccessParallel( RestResponse[ProduceBlindedBlockResponse], - SlotDuration, {BeaconNodeRole.BlockProposalData}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.BlockProposalData}, produceBlindedBlock(it, slot, randao_reveal, graffiti)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace ResponseSuccess, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return res.get().data of ApiStrategyKind.Priority: vc.firstSuccessSequential( RestResponse[ProduceBlindedBlockResponse], - SlotDuration, {BeaconNodeRole.BlockProposalData}, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.BlockProposalData}, produceBlindedBlock(it, slot, randao_reveal, graffiti)): if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1848,21 +2290,30 @@ proc produceBlindedBlock*( of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc publishBlindedBlock*( vc: ValidatorClientRef, @@ -1877,11 +2328,14 @@ proc publishBlindedBlock*( BlockPublished = "Block was successfully published" BlockBroadcasted = "Block not passed validation, but still published" ErrorMessage = "Unable to publish block" + var failures: seq[ApiNodeFailure] case strategy of ApiStrategyKind.First, ApiStrategyKind.Best: let res = block: - vc.firstSuccessParallel(RestPlainResponse, SlotDuration, + vc.firstSuccessParallel(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}): case data.kind of ConsensusFork.Phase0: @@ -1893,49 +2347,64 @@ proc publishBlindedBlock*( of ConsensusFork.Capella: publishBlindedBlock(it, data.capellaData) of ConsensusFork.EIP4844: - debugRaiseAssert $eip4844ImplementationMissing & ": validator_client/api.nim:publishBlindedBlock (1)" + debugRaiseAssert $eip4844ImplementationMissing & + ": validator_client/api.nim:publishBlindedBlock (1)" let f = newFuture[RestPlainResponse]("") f.fail(new RestError) f do: if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: of 200: trace BlockPublished, endpoint = node - RestBeaconNodeStatus.Online + true of 202: debug BlockBroadcasted, endpoint = node - RestBeaconNodeStatus.Online + true of 400: debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + if res.isErr(): - raise newException(ValidatorApiError, res.error()) + raise (ref ValidatorApiError)(msg: res.error, data: failures) return true of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, SlotDuration, + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}): case data.kind of ConsensusFork.Phase0: @@ -1947,14 +2416,17 @@ proc publishBlindedBlock*( of ConsensusFork.Capella: publishBlindedBlock(it, data.capellaData) of ConsensusFork.EIP4844: - debugRaiseAssert $eip4844ImplementationMissing & ": validator_client/api.nim:publishBlindedBlock (2)" + debugRaiseAssert $eip4844ImplementationMissing & + ": validator_client/api.nim:publishBlindedBlock (2)" let f = newFuture[RestPlainResponse]("") f.fail(new RestError) f do: if apiResponse.isErr(): debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false else: let response = apiResponse.get() case response.status: @@ -1968,31 +2440,42 @@ proc publishBlindedBlock*( debug ResponseInvalidError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Incompatible + node.status = RestBeaconNodeStatus.Incompatible + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false of 500: debug ResponseInternalError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false of 503: debug ResponseNoSyncError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + if node.status notin NotSyncedStatus: + node.status = RestBeaconNodeStatus.NotSynced + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false else: debug ResponseUnexpectedError, response_code = response.status, endpoint = node, response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline + node.status = RestBeaconNodeStatus.Offline + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false - raise newException(ValidatorApiError, ErrorMessage) + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) proc prepareBeaconCommitteeSubnet*( vc: ValidatorClientRef, data: seq[RestCommitteeSubscription], ): Future[int] {.async.} = logScope: request = "prepareBeaconCommitteeSubnet" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.AggregatedData}, prepareBeaconCommitteeSubnet(it, data)) if len(resp.data) == 0: @@ -2034,7 +2517,9 @@ proc prepareSyncCommitteeSubnets*( data: seq[RestSyncCommitteeSubscription], ): Future[int] {.async.} = logScope: request = "prepareSyncCommitteeSubnet" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.SyncCommitteeData}, prepareSyncCommitteeSubnets(it, data)) if len(resp.data) == 0: @@ -2075,7 +2560,9 @@ proc getValidatorsActivity*( validators: seq[ValidatorIndex] ): Future[GetValidatorsActivityResponse] {.async.} = logScope: request = "getValidatorsActivity" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.Duties}, getValidatorsActivity(it, epoch, validators)) case resp.status @@ -2177,7 +2664,9 @@ proc prepareBeaconProposer*( data: seq[PrepareBeaconProposer] ): Future[int] {.async.} = logScope: request = "prepareBeaconProposer" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}, prepareBeaconProposer(it, data)) if len(resp.data) == 0: @@ -2218,7 +2707,9 @@ proc registerValidator*( data: seq[SignedValidatorRegistrationV1] ): Future[int] {.async.} = logScope: request = "registerValidators" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.BlockProposalPublish}, registerValidator(it, data)) if len(resp.data) == 0: @@ -2259,7 +2750,9 @@ proc getValidatorsLiveness*( validators: seq[ValidatorIndex] ): Future[GetValidatorsLivenessResponse] {.async.} = logScope: request = "getValidatorsActivity" - let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + let resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, {BeaconNodeRole.Duties}, getValidatorsLiveness(it, epoch, validators)) case resp.status @@ -2333,7 +2826,8 @@ proc getValidatorsLiveness*( response_code = response.status, endpoint = apiResponse.node, response_error = response.getErrorMessage() - apiResponse.node.status = RestBeaconNodeStatus.NotSynced + if apiResponse.node.status notin NotSyncedStatus: + apiResponse.node.status = RestBeaconNodeStatus.NotSynced continue else: debug "Server reports unexpected error code", diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index 480619478..9c48f9f50 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -81,11 +81,12 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, let res = try: await vc.submitPoolAttestations(@[attestation], ApiStrategyKind.First) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to publish attestation", attestation = shortLog(attestation), validator = shortLog(validator), - validator_index = vindex + validator_index = vindex, + reason = exc.getFailureReason() return false except CancelledError as exc: debug "Attestation publishing process was interrupted" @@ -160,11 +161,12 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, let res = try: await vc.publishAggregateAndProofs(@[signedProof], ApiStrategyKind.First) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to publish aggregated attestation", attestation = shortLog(signedProof.message.aggregate), validator = shortLog(validator), - validator_index = vindex + validator_index = vindex, + reason = exc.getFailureReason() return false except CancelledError as exc: debug "Publish aggregate and proofs request was interrupted" @@ -287,9 +289,10 @@ proc produceAndPublishAggregates(service: AttestationServiceRef, try: await vc.getAggregatedAttestation(slot, attestationRoot, ApiStrategyKind.Best) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to get aggregated attestation data", slot = slot, - attestation_root = shortLog(attestationRoot) + attestation_root = shortLog(attestationRoot), + reason = exc.getFailureReason() return except CancelledError as exc: debug "Aggregated attestation request was interrupted" @@ -360,9 +363,10 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef, let ad = try: await service.produceAndPublishAttestations(slot, committee_index, duties) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to proceed attestations", slot = slot, - committee_index = committee_index, duties_count = len(duties) + committee_index = committee_index, duties_count = len(duties), + reason = exc.getFailureReason() return except CancelledError as exc: debug "Publish attestation request was interrupted" diff --git a/beacon_chain/validator_client/block_service.nim b/beacon_chain/validator_client/block_service.nim index 78c92ec05..4c4a41204 100644 --- a/beacon_chain/validator_client/block_service.nim +++ b/beacon_chain/validator_client/block_service.nim @@ -38,8 +38,8 @@ proc produceBlock( try: await vc.produceBlockV2(slot, randao_reveal, graffiti, ApiStrategyKind.Best) - except ValidatorApiError: - error "Unable to retrieve block data" + except ValidatorApiError as exc: + error "Unable to retrieve block data", reason = exc.getFailureReason() return Opt.none(PreparedBeaconBlock) except CancelledError as exc: error "Block data production has been interrupted" @@ -69,7 +69,8 @@ proc produceBlindedBlock( await vc.produceBlindedBlock(slot, randao_reveal, graffiti, ApiStrategyKind.Best) except ValidatorApiError as exc: - error "Unable to retrieve blinded block data", error_msg = exc.msg + error "Unable to retrieve blinded block data", error_msg = exc.msg, + reason = exc.getFailureReason() return Opt.none(PreparedBlindedBeaconBlock) except CancelledError as exc: error "Blinded block data production has been interrupted" @@ -214,8 +215,9 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, try: debug "Sending blinded block" await vc.publishBlindedBlock(signedBlock, ApiStrategyKind.First) - except ValidatorApiError: - error "Unable to publish blinded block" + except ValidatorApiError as exc: + error "Unable to publish blinded block", + reason = exc.getFailureReason() return except CancelledError as exc: debug "Blinded block publication has been interrupted" @@ -275,8 +277,8 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, try: debug "Sending block" await vc.publishBlock(signedBlock, ApiStrategyKind.First) - except ValidatorApiError: - error "Unable to publish block" + except ValidatorApiError as exc: + error "Unable to publish block", reason = exc.getFailureReason() return except CancelledError as exc: debug "Block publication has been interrupted" diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 69c9be564..38f0acdad 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -64,7 +64,7 @@ type DutiesServiceRef* = ref object of ClientServiceRef FallbackServiceRef* = ref object of ClientServiceRef - onlineEvent*: AsyncEvent + changesEvent*: AsyncEvent ForkServiceRef* = ref object of ClientServiceRef @@ -127,7 +127,16 @@ type duties*: Table[Epoch, SyncCommitteeDuty] RestBeaconNodeStatus* {.pure.} = enum - Uninitalized, Offline, Incompatible, NotSynced, Online + Offline, ## BN is offline. + Online, ## BN is online, passed checkOnline() check. + Incompatible, ## BN configuration is NOT compatible with VC configuration. + Compatible, ## BN configuration is compatible with VC configuration. + NotSynced, ## BN is not in sync. + OptSynced, ## BN is optimistically synced (EL is not in sync). + Synced ## BN and EL are synced. + + BeaconNodesCounters* = object + data*: array[int(high(RestBeaconNodeStatus)) + 1, int] BeaconNodeServerRef* = ref BeaconNodeServer @@ -176,10 +185,18 @@ type validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1] rng*: ref HmacDrbgContext + ApiFailure* {.pure.} = enum + Communication, Invalid, NotFound, NotSynced, Internal, Unexpected + + ApiNodeFailure* = object + node*: BeaconNodeServerRef + failure*: ApiFailure + ValidatorClientRef* = ref ValidatorClient ValidatorClientError* = object of CatchableError ValidatorApiError* = object of ValidatorClientError + data*: seq[ApiNodeFailure] const DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)) @@ -225,6 +242,49 @@ proc `$`*(roles: set[BeaconNodeRole]): string = else: "{}" +proc `$`*(status: RestBeaconNodeStatus): string = + case status + of RestBeaconNodeStatus.Offline: "offline" + of RestBeaconNodeStatus.Online: "online" + of RestBeaconNodeStatus.Incompatible: "incompatible" + of RestBeaconNodeStatus.Compatible: "compatible" + of RestBeaconNodeStatus.NotSynced: "bn-unsynced" + of RestBeaconNodeStatus.OptSynced: "el-unsynced" + of RestBeaconNodeStatus.Synced: "synced" + +proc `$`*(failure: ApiFailure): string = + case failure + of ApiFailure.Communication: "Connection with beacon node has been lost" + of ApiFailure.Invalid: "Invalid response received from beacon node" + of ApiFailure.NotFound: "Beacon node did not found requested entity" + of ApiFailure.NotSynced: "Beacon node not in sync with network" + of ApiFailure.Internal: "Beacon node reports internal failure" + of ApiFailure.Unexpected: "Beacon node reports unexpected status" + +proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters = + var res = BeaconNodesCounters() + for node in vc.beaconNodes: inc(res.data[int(node.status)]) + res + +proc getFailureReason*(exc: ref ValidatorApiError): string = + var counts: array[int(high(ApiFailure)) + 1, int] + let errors = exc[].data + + if len(errors) > 1: + var maxFailure = + block: + var maxCount = -1 + var res = ApiFailure.Unexpected + for item in errors: + inc(counts[int(item.failure)]) + if counts[int(item.failure)] > maxCount: + maxCount = counts[int(item.failure)] + res = item.failure + res + $maxFailure + else: + $errors[0].failure + proc shortLog*(roles: set[BeaconNodeRole]): string = var r = "AGBSD" if BeaconNodeRole.AttestationData in roles: @@ -362,7 +422,8 @@ proc init*(t: typedesc[BeaconNodeServerRef], remote: Uri, let server = BeaconNodeServerRef( client: client, endpoint: $remote, index: index, roles: roles, logIdent: client.address.hostname & ":" & - Base10.toString(client.address.port) + Base10.toString(client.address.port), + status: RestBeaconNodeStatus.Offline ) ok(server) @@ -731,3 +792,7 @@ proc prepareRegistrationList*( incorrect_time = timed return registrations + +proc init*(t: typedesc[ApiNodeFailure], node: BeaconNodeServerRef, + failure: ApiFailure): ApiNodeFailure = + ApiNodeFailure(node: node, failure: failure) diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 9ab531a14..4af84f287 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -65,8 +65,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = let res = try: await vc.getValidators(idents, ApiStrategyKind.First) - except ValidatorApiError: - error "Unable to get head state's validator information" + except ValidatorApiError as exc: + error "Unable to get head state's validator information", + reason = exc.getFailureReason() return except CancelledError as exc: debug "Validator's indices processing was interrupted" @@ -138,8 +139,9 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef, let res = try: await vc.getAttesterDuties(epoch, indices, ApiStrategyKind.First) - except ValidatorApiError: - error "Unable to get attester duties", epoch = epoch + except ValidatorApiError as exc: + notice "Unable to get attester duties", epoch = epoch, + reason = exc.getFailureReason() return 0 except CancelledError as exc: debug "Attester duties processing was interrupted" @@ -271,8 +273,9 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, res = try: await vc.getSyncCommitteeDuties(epoch, indices, ApiStrategyKind.First) - except ValidatorApiError: - error "Unable to get sync committee duties", epoch = epoch + except ValidatorApiError as exc: + notice "Unable to get sync committee duties", epoch = epoch, + reason = exc.getFailureReason() return 0 except CancelledError as exc: debug "Sync committee duties processing was interrupted" @@ -502,9 +505,9 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} = else: debug "No relevant proposer duties received", slot = currentSlot, duties_count = len(duties) - except ValidatorApiError: - debug "Unable to get proposer duties", slot = currentSlot, - epoch = currentEpoch + except ValidatorApiError as exc: + notice "Unable to get proposer duties", slot = currentSlot, + epoch = currentEpoch, reason = exc.getFailureReason() except CancelledError as exc: debug "Proposer duties processing was interrupted" raise exc @@ -531,7 +534,7 @@ proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} = except ValidatorApiError as exc: warn "Unable to prepare beacon proposers", slot = currentSlot, epoch = currentEpoch, err_name = exc.name, - err_msg = exc.msg + err_msg = exc.msg, reason = exc.getFailureReason() 0 except CancelledError as exc: debug "Beacon proposer preparation processing was interrupted" @@ -575,7 +578,7 @@ proc registerValidators*(service: DutiesServiceRef) {.async.} = except ValidatorApiError as exc: warn "Unable to register validators", slot = currentSlot, fork = genesisFork, err_name = exc.name, - err_msg = exc.msg + err_msg = exc.msg, reason = exc.getFailureReason() 0 except CancelledError as exc: debug "Validator registration was interrupted", slot = currentSlot, diff --git a/beacon_chain/validator_client/fallback_service.nim b/beacon_chain/validator_client/fallback_service.nim index 034514551..163da1060 100644 --- a/beacon_chain/validator_client/fallback_service.nim +++ b/beacon_chain/validator_client/fallback_service.nim @@ -12,88 +12,68 @@ const logScope: service = ServiceName -type - BeaconNodesCounters* = object - online*: int - offline*: int - uninitalized*: int - incompatible*: int - nosync*: int +proc nodesCount*(vc: ValidatorClientRef, + statuses: set[RestBeaconNodeStatus], + roles: set[BeaconNodeRole] = {}): int = + if len(roles) == 0: + vc.beaconNodes.countIt(it.status in statuses) + else: + vc.beaconNodes.countIt((it.roles * roles != {}) and (it.status in statuses)) -proc onlineNodes*(vc: ValidatorClientRef, +proc filterNodes*(vc: ValidatorClientRef, statuses: set[RestBeaconNodeStatus], roles: set[BeaconNodeRole] = {}): seq[BeaconNodeServerRef] = if len(roles) == 0: - vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online) + vc.beaconNodes.filterIt(it.status in statuses) else: vc.beaconNodes.filterIt((it.roles * roles != {}) and - (it.status == RestBeaconNodeStatus.Online)) + (it.status in statuses)) -proc onlineNodesCount*(vc: ValidatorClientRef, - roles: set[BeaconNodeRole] = {}): int = - if len(roles) == 0: - vc.beaconNodes.countIt(it.status == RestBeaconNodeStatus.Online) - else: - vc.beaconNodes.countIt((it.roles * roles != {}) and - (it.status == RestBeaconNodeStatus.Online)) +proc otherNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] = + vc.beaconNodes.filterIt(it.status != RestBeaconNodeStatus.Synced) -proc unusableNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] = - vc.beaconNodes.filterIt(it.status != RestBeaconNodeStatus.Online) +proc otherNodesCount*(vc: ValidatorClientRef): int = + vc.beaconNodes.countIt(it.status != RestBeaconNodeStatus.Synced) -proc unusableNodesCount*(vc: ValidatorClientRef): int = - vc.beaconNodes.countIt(it.status != RestBeaconNodeStatus.Online) - -proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters = - var res = BeaconNodesCounters() - for node in vc.beaconNodes: - case node.status - of RestBeaconNodeStatus.Uninitalized: - inc(res.uninitalized) - of RestBeaconNodeStatus.Offline: - inc(res.offline) - of RestBeaconNodeStatus.Incompatible: - inc(res.incompatible) - of RestBeaconNodeStatus.NotSynced: - inc(res.nosync) - of RestBeaconNodeStatus.Online: - inc(res.online) - res - -proc waitOnlineNodes*(vc: ValidatorClientRef, timeoutFut: Future[void] = nil, - roles: set[BeaconNodeRole] = {}) {.async.} = +proc waitNodes*(vc: ValidatorClientRef, timeoutFut: Future[void], + statuses: set[RestBeaconNodeStatus], + roles: set[BeaconNodeRole], waitChanges: bool) {.async.} = doAssert(not(isNil(vc.fallbackService))) + var iterations = 0 while true: - if vc.onlineNodesCount(roles) != 0: - break - else: - if vc.fallbackService.onlineEvent.isSet(): - vc.fallbackService.onlineEvent.clear() - warn "Connection with beacon node(s) has been lost", - online_nodes = vc.onlineNodesCount(), - unusable_nodes = vc.unusableNodesCount(), - total_nodes = len(vc.beaconNodes) - if isNil(timeoutFut): - await vc.fallbackService.onlineEvent.wait() - else: - let breakLoop = - block: - let waitFut = vc.fallbackService.onlineEvent.wait() - try: - discard await race(waitFut, timeoutFut) - except CancelledError as exc: - if not(waitFut.finished()): - await waitFut.cancelAndWait() - raise exc + if not(waitChanges) or (iterations != 0): + if vc.nodesCount(statuses, roles) != 0: + break + if vc.fallbackService.changesEvent.isSet(): + vc.fallbackService.changesEvent.clear() + + if isNil(timeoutFut): + await vc.fallbackService.changesEvent.wait() + else: + let breakLoop = + block: + let waitFut = vc.fallbackService.changesEvent.wait() + try: + discard await race(waitFut, timeoutFut) + except CancelledError as exc: if not(waitFut.finished()): await waitFut.cancelAndWait() - true - else: - false - if breakLoop: - break + raise exc -proc checkCompatible(vc: ValidatorClientRef, - node: BeaconNodeServerRef) {.async.} = + if not(waitFut.finished()): + await waitFut.cancelAndWait() + true + else: + false + if breakLoop: + break + + inc(iterations) + +proc checkCompatible( + vc: ValidatorClientRef, + node: BeaconNodeServerRef + ): Future[RestBeaconNodeStatus] {.async.} = logScope: endpoint = node let info = try: @@ -102,18 +82,17 @@ proc checkCompatible(vc: ValidatorClientRef, res.data.data except CancelledError as exc: debug "Configuration request was interrupted" - node.status = RestBeaconNodeStatus.Offline raise exc except RestError as exc: - debug "Unable to obtain beacon node's configuration", - error_name = exc.name, error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + debug "Unable to obtain beacon node's configuration", + error_name = exc.name, error_message = exc.msg + return RestBeaconNodeStatus.Offline except CatchableError as exc: - error "Unexpected exception", error_name = exc.name, - error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + error "Unexpected exception", error_name = exc.name, + error_message = exc.msg + return RestBeaconNodeStatus.Offline let genesis = try: @@ -122,18 +101,17 @@ proc checkCompatible(vc: ValidatorClientRef, res.data.data except CancelledError as exc: debug "Genesis request was interrupted" - node.status = RestBeaconNodeStatus.Offline raise exc except RestError as exc: - debug "Unable to obtain beacon node's genesis", - error_name = exc.name, error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + debug "Unable to obtain beacon node's genesis", + error_name = exc.name, error_message = exc.msg + return RestBeaconNodeStatus.Offline except CatchableError as exc: - error "Unexpected exception", error_name = exc.name, - error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + error "Unexpected exception", error_name = exc.name, + error_message = exc.msg + return RestBeaconNodeStatus.Offline let genesisFlag = (genesis != vc.beaconGenesis) let configFlag = @@ -160,18 +138,24 @@ proc checkCompatible(vc: ValidatorClientRef, info.DOMAIN_SELECTION_PROOF != DOMAIN_SELECTION_PROOF or info.DOMAIN_AGGREGATE_AND_PROOF != DOMAIN_AGGREGATE_AND_PROOF - if configFlag or genesisFlag: - node.status = RestBeaconNodeStatus.Incompatible - warn "Beacon node has incompatible configuration", - genesis_flag = genesisFlag, config_flag = configFlag - else: - info "Beacon node has compatible configuration" - node.config = some(info) - node.genesis = some(genesis) - node.status = RestBeaconNodeStatus.Online + let res = + if configFlag or genesisFlag: + if node.status != RestBeaconNodeStatus.Incompatible: + warn "Beacon node has incompatible configuration", + genesis_flag = genesisFlag, config_flag = configFlag + RestBeaconNodeStatus.Incompatible + else: + if node.status != RestBeaconNodeStatus.Compatible: + debug "Beacon node has compatible configuration" + node.config = some(info) + node.genesis = some(genesis) + RestBeaconNodeStatus.Compatible + return res -proc checkSync(vc: ValidatorClientRef, - node: BeaconNodeServerRef) {.async.} = +proc checkSync( + vc: ValidatorClientRef, + node: BeaconNodeServerRef + ): Future[RestBeaconNodeStatus] {.async.} = logScope: endpoint = node let syncInfo = try: @@ -180,20 +164,19 @@ proc checkSync(vc: ValidatorClientRef, res.data.data except CancelledError as exc: debug "Sync status request was interrupted" - node.status = RestBeaconNodeStatus.Offline raise exc except RestError as exc: - debug "Unable to obtain beacon node's sync status", - error_name = exc.name, error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + debug "Unable to obtain beacon node's sync status", + error_name = exc.name, error_message = exc.msg + return RestBeaconNodeStatus.Offline except CatchableError as exc: - error "Unexpected exception", error_name = exc.name, - error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + error "Unexpected exception", error_name = exc.name, + error_message = exc.msg + return RestBeaconNodeStatus.Offline node.syncInfo = some(syncInfo) - node.status = + let res = block: let optimistic = if syncInfo.is_optimistic.isNone(): @@ -203,20 +186,29 @@ proc checkSync(vc: ValidatorClientRef, if not(syncInfo.is_syncing) or (syncInfo.sync_distance < SYNC_TOLERANCE): if not(syncInfo.is_optimistic.get(false)): - info "Beacon node is in sync", sync_distance = syncInfo.sync_distance, - head_slot = syncInfo.head_slot, is_optimistic = optimistic - RestBeaconNodeStatus.Online + if node.status != RestBeaconNodeStatus.Synced: + info "Beacon node is in sync", + sync_distance = syncInfo.sync_distance, + head_slot = syncInfo.head_slot, is_optimistic = optimistic + RestBeaconNodeStatus.Synced else: - warn "Execution client not in sync (beacon node optimistically synced)", + if node.status != RestBeaconNodeStatus.OptSynced: + info "Execution client not in sync " & + "(beacon node optimistically synced)", + sync_distance = syncInfo.sync_distance, + head_slot = syncInfo.head_slot, is_optimistic = optimistic + RestBeaconNodeStatus.OptSynced + else: + if node.status != RestBeaconNodeStatus.NotSynced: + warn "Beacon node not in sync", sync_distance = syncInfo.sync_distance, head_slot = syncInfo.head_slot, is_optimistic = optimistic - RestBeaconNodeStatus.NotSynced - else: - warn "Beacon node not in sync", sync_distance = syncInfo.sync_distance, - head_slot = syncInfo.head_slot, is_optimistic = optimistic RestBeaconNodeStatus.NotSynced + return res -proc checkOnline(node: BeaconNodeServerRef) {.async.} = +proc checkOnline( + node: BeaconNodeServerRef + ): Future[RestBeaconNodeStatus] {.async.} = logScope: endpoint = node debug "Checking beacon node status" let agent = @@ -225,40 +217,60 @@ proc checkOnline(node: BeaconNodeServerRef) {.async.} = res.data.data except CancelledError as exc: debug "Status request was interrupted" - node.status = RestBeaconNodeStatus.Offline raise exc except RestError as exc: - debug "Unable to check beacon node's status", - error_name = exc.name, error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return + if node.status != RestBeaconNodeStatus.Offline: + debug "Unable to check beacon node's status", + error_name = exc.name, error_message = exc.msg + return RestBeaconNodeStatus.Offline except CatchableError as exc: - error "Unexpected exception", error_name = exc.name, - error_message = exc.msg - node.status = RestBeaconNodeStatus.Offline - return - info "Beacon node has been identified", agent = agent.version - node.ident = some(agent.version) - node.status = RestBeaconNodeStatus.Online + if node.status != RestBeaconNodeStatus.Offline: + error "Unexpected exception", error_name = exc.name, + error_message = exc.msg + return RestBeaconNodeStatus.Offline + if node.status != RestBeaconNodeStatus.Online: + debug "Beacon node has been identified", agent = agent.version + return RestBeaconNodeStatus.Online proc checkNode(vc: ValidatorClientRef, - node: BeaconNodeServerRef) {.async.} = - debug "Checking beacon node", endpoint = node - await node.checkOnline() - if node.status != RestBeaconNodeStatus.Online: - return - await vc.checkCompatible(node) - if node.status != RestBeaconNodeStatus.Online: - return - await vc.checkSync(node) + node: BeaconNodeServerRef): Future[bool] {.async.} = + let nstatus = node.status + debug "Checking beacon node", endpoint = node, status = node.status -proc checkNodes*(service: FallbackServiceRef) {.async.} = + if nstatus in {RestBeaconNodeStatus.Offline}: + let status = await node.checkOnline() + node.status = status + if status != RestBeaconNodeStatus.Online: + return nstatus != status + + if nstatus in {RestBeaconNodeStatus.Offline, + RestBeaconNodeStatus.Online, + RestBeaconNodeStatus.Incompatible}: + let status = await vc.checkCompatible(node) + node.status = status + if status != RestBeaconNodeStatus.Compatible: + return nstatus != status + + if nstatus in {RestBeaconNodeStatus.Offline, + RestBeaconNodeStatus.Online, + RestBeaconNodeStatus.Incompatible, + RestBeaconNodeStatus.Compatible, + RestBeaconNodeStatus.OptSynced, + RestBeaconNodeStatus.NotSynced}: + let status = await vc.checkSync(node) + node.status = status + return nstatus != status + +proc checkNodes*(service: FallbackServiceRef): Future[bool] {.async.} = let - nodesToCheck = service.client.unusableNodes() + nodesToCheck = service.client.otherNodes() pendingChecks = nodesToCheck.mapIt(service.client.checkNode(it)) - + var res = false try: await allFutures(pendingChecks) + for fut in pendingChecks: + if fut.completed() and fut.read(): + res = true except CancelledError as exc: var pending: seq[Future[void]] for future in pendingChecks: @@ -266,6 +278,7 @@ proc checkNodes*(service: FallbackServiceRef) {.async.} = pending.add(future.cancelAndWait()) await allFutures(pending) raise exc + return res proc mainLoop(service: FallbackServiceRef) {.async.} = let vc = service.client @@ -278,19 +291,8 @@ proc mainLoop(service: FallbackServiceRef) {.async.} = # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - await service.checkNodes() + if await service.checkNodes(): service.changesEvent.fire() await sleepAsync(2.seconds) - if service.client.onlineNodesCount() != 0: - service.onlineEvent.fire() - else: - let counter = vc.getNodeCounts() - warn "No suitable beacon nodes available", - online_nodes = counter.online, - offline_nodes = counter.offline, - uninitalized_nodes = counter.uninitalized, - incompatible_nodes = counter.incompatible, - nonsynced_nodes = counter.nosync, - total_nodes = len(vc.beaconNodes) false except CancelledError as exc: debug "Service interrupted" @@ -308,10 +310,10 @@ proc init*(t: typedesc[FallbackServiceRef], logScope: service = ServiceName var res = FallbackServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized, - onlineEvent: newAsyncEvent()) + changesEvent: newAsyncEvent()) debug "Initializing service" # Perform initial nodes check. - await res.checkNodes() + if await res.checkNodes(): res.changesEvent.fire() return res proc start*(service: FallbackServiceRef) = diff --git a/beacon_chain/validator_client/fork_service.nim b/beacon_chain/validator_client/fork_service.nim index b6cd114e4..da1649f99 100644 --- a/beacon_chain/validator_client/fork_service.nim +++ b/beacon_chain/validator_client/fork_service.nim @@ -53,7 +53,8 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} = try: await vc.getForkSchedule(ApiStrategyKind.Best) except ValidatorApiError as exc: - error "Unable to retrieve fork schedule", reason = exc.msg + error "Unable to retrieve fork schedule", + reason = exc.getFailureReason(), err_msg = exc.msg return except CancelledError as exc: debug "Fork retrieval process was interrupted" diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 1fd277312..ce7fb254e 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -57,11 +57,12 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef, let res = try: await vc.submitPoolSyncCommitteeSignature(message, ApiStrategyKind.First) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to publish sync committee message", message = shortLog(message), validator = shortLog(validator), - validator_index = vindex + validator_index = vindex, + reason = exc.getFailureReason() return false except CancelledError: debug "Publish sync committee message request was interrupted" @@ -171,12 +172,13 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef, try: await vc.publishContributionAndProofs(@[restSignedProof], ApiStrategyKind.First) - except ValidatorApiError as err: + except ValidatorApiError as exc: error "Unable to publish sync contribution", contribution = shortLog(proof.contribution), validator = shortLog(validator), validator_index = validatorIdx, - err_msg = err.msg + err_msg = exc.msg, + reason = exc.getFailureReason() false except CancelledError: debug "Publish sync contribution request was interrupted" @@ -278,9 +280,10 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, let aggContribution = try: await contributionsFuts[item.subcommitteeIdx] - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to get sync message contribution data", slot = slot, - beaconBlockRoot = shortLog(beaconBlockRoot) + beaconBlockRoot = shortLog(beaconBlockRoot), + reason = exc.getFailureReason() return except CancelledError: debug "Request for sync message contribution was interrupted" @@ -357,12 +360,13 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, res.data.root else: if res.execution_optimistic.get(): - error "Could not obtain head block's root because beacon node " & - "only optimistically synced", slot = slot + notice "Execution client not in sync; skipping validator duties " & + "for now", slot = slot return res.data.root except ValidatorApiError as exc: - error "Unable to retrieve head block's root to sign", reason = exc.msg + error "Unable to retrieve head block's root to sign", reason = exc.msg, + reason = exc.getFailureReason() return except CancelledError: debug "Block root request was interrupted" @@ -376,9 +380,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, await service.produceAndPublishSyncCommitteeMessages(slot, beaconBlockRoot, duties) - except ValidatorApiError: + except ValidatorApiError as exc: error "Unable to proceed sync committee messages", slot = slot, - duties_count = len(duties) + duties_count = len(duties), reason = exc.getFailureReason() return except CancelledError: debug "Sync committee producing process was interrupted"