From 8417b7e06492eb75098a7ec4b53b33ded101c63c Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Thu, 3 Nov 2022 21:23:33 +0200 Subject: [PATCH] Sync committee subscription fixes. (#4281) * Preparing code. * Fix prepareXXX procedures to use `onceToAll` strategy only. * Remove lighthouse like subscription code. * Address review comments. --- beacon_chain/rpc/rest_validator_api.nim | 21 +- beacon_chain/validator_client/api.nim | 243 ++++++------------ .../validator_client/duties_service.nim | 18 +- .../sync_committee_service.nim | 4 +- 4 files changed, 99 insertions(+), 187 deletions(-) diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index a0f296dac..bdf55e93b 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -656,6 +656,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = contentBody: Option[ContentBody]) -> RestApiResponse: let subscriptions = block: + var res: seq[RestSyncCommitteeSubscription] if contentBody.isNone(): return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) let dres = decodeBody(seq[RestSyncCommitteeSubscription], @@ -671,20 +672,22 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = return RestApiResponse.jsonError(Http400, EpochFromTheIncorrectForkError) if uint64(item.validator_index) >= - lenu64(getStateField(node.dag.headState, validators)): + lenu64(getStateField(node.dag.headState, validators)): return RestApiResponse.jsonError(Http400, InvalidValidatorIndexValueError) - let validator_pubkey = - getStateField(node.dag.headState, validators).item( - item.validator_index).pubkey + res.add(item) + res - node.syncCommitteeMsgPool + for item in subscriptions: + let validator_pubkey = + getStateField(node.dag.headState, validators).item( + item.validator_index).pubkey + + node.syncCommitteeMsgPool .syncCommitteeSubscriptions[validator_pubkey] = item.until_epoch - node.validatorMonitor[].addAutoMonitor( - validator_pubkey, ValidatorIndex(item.validator_index)) - - subs + node.validatorMonitor[].addAutoMonitor( + validator_pubkey, ValidatorIndex(item.validator_index)) return RestApiResponse.jsonMsgResponse(SyncCommitteeSubscriptionSuccess) diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 4e0b82d15..a73562924 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -1775,180 +1775,85 @@ proc publishBlock*( proc prepareBeaconCommitteeSubnet*( vc: ValidatorClientRef, data: seq[RestCommitteeSubscription], - strategy: ApiStrategyKind - ): Future[bool] {.async.} = - logScope: - request = "prepareBeaconCommitteeSubnet" - strategy = $strategy - - const - ErrorMessage = "Unable to prepare committee subnet" - NoErrorMessage = "Commitee subnet was successfully prepared" - - case strategy - of ApiStrategyKind.First, ApiStrategyKind.Best: - var status = false - let res = vc.firstSuccessParallel(RestPlainResponse, OneThirdDuration, - {BeaconNodeRole.Duties}, - prepareBeaconCommitteeSubnet(it, data)): - if apiResponse.isErr(): - debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + ): Future[int] {.async.} = + logScope: request = "prepareBeaconCommitteeSubnet" + let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + {BeaconNodeRole.AggregatedData}, + prepareBeaconCommitteeSubnet(it, data)) + if len(resp.data) == 0: + # We did not get any response from beacon nodes. + case resp.status + of ApiOperation.Success: + # This should not be happened, there should be present at least one + # successfull response. + return 0 + of ApiOperation.Timeout: + debug "Unable to subscribe to beacon committee subnets in time", + timeout = SlotDuration + return 0 + of ApiOperation.Interrupt: + debug "Beacon committee subscription request was interrupted" + return 0 + of ApiOperation.Failure: + debug "Unexpected error happened while subscribing to beacon committee " & + "subnets" + return 0 + else: + var count = 0 + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to subscribe to beacon committee subnets", + endpoint = apiResponse.node, error = apiResponse.data.error() else: - let response = apiResponse.get() - case response.status - of 200: - trace NoErrorMessage, endpoint = node - status = true - RestBeaconNodeStatus.Online - of 400: - debug ResponseInvalidError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Online - of 500: - debug ResponseInternalError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - of 503: - debug ResponseNoSyncError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + let response = apiResponse.data.get() + if response.status == 200: + inc(count) else: - debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - if res.isErr(): - raise newException(ValidatorApiError, res.error()) - return status - - of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, OneThirdDuration, - {BeaconNodeRole.Duties}, - prepareBeaconCommitteeSubnet(it, data)): - if apiResponse.isErr(): - debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline - else: - let response = apiResponse.get() - case response.status - of 200: - trace NoErrorMessage, endpoint = node - return true - of 400: - debug ResponseInvalidError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - return false - of 500: - debug ResponseInternalError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - of 503: - debug ResponseNoSyncError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced - else: - debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - - raise newException(ValidatorApiError, ErrorMessage) + debug "Subscription to beacon commitee subnets failed", + status = response.status, endpoint = apiResponse.node, + message = response.getErrorMessage() + return count proc prepareSyncCommitteeSubnets*( vc: ValidatorClientRef, data: seq[RestSyncCommitteeSubscription], - strategy: ApiStrategyKind - ): Future[bool] {.async.} = - logScope: - request = "prepareSyncCommitteeSubnet" - strategy = $strategy - - const - ErrorMessage = "Unable to prepare sync committee subnet" - NoErrorMessage = "Commitee subnet was successfully prepared" - - case strategy - of ApiStrategyKind.First, ApiStrategyKind.Best: - var status = false - let res = vc.firstSuccessParallel(RestPlainResponse, OneThirdDuration, - {BeaconNodeRole.Duties}, - prepareSyncCommitteeSubnets(it, data)): - if apiResponse.isErr(): - debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline + ): Future[int] {.async.} = + logScope: request = "prepareSyncCommitteeSubnet" + let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + {BeaconNodeRole.SyncCommitteeData}, + prepareSyncCommitteeSubnets(it, data)) + if len(resp.data) == 0: + # We did not get any response from beacon nodes. + case resp.status + of ApiOperation.Success: + # This should not be happened, there should be present at least one + # successfull response. + return 0 + of ApiOperation.Timeout: + debug "Unable to prepare sync committee subnets in time", + timeout = SlotDuration + return 0 + of ApiOperation.Interrupt: + debug "Sync committee subnets preparation request was interrupted" + return 0 + of ApiOperation.Failure: + debug "Unexpected error happened while preparing sync committee subnets" + return 0 + else: + var count = 0 + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to prepare sync committee subnets", + endpoint = apiResponse.node, error = apiResponse.data.error() else: - let response = apiResponse.get() - case response.status - of 200: - trace NoErrorMessage, endpoint = node - status = true - RestBeaconNodeStatus.Online - of 400: - debug ResponseInvalidError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Online - of 500: - debug ResponseInternalError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - of 503: - debug ResponseNoSyncError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced + let response = apiResponse.data.get() + if response.status == 200: + inc(count) else: - debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - if res.isErr(): - raise newException(ValidatorApiError, res.error()) - return status - - of ApiStrategyKind.Priority: - vc.firstSuccessSequential(RestPlainResponse, OneThirdDuration, - {BeaconNodeRole.Duties}, - prepareSyncCommitteeSubnets(it, data)): - if apiResponse.isErr(): - debug ErrorMessage, endpoint = node, error = apiResponse.error() - RestBeaconNodeStatus.Offline - else: - let response = apiResponse.get() - case response.status - of 200: - trace NoErrorMessage, endpoint = node - return true - of 400: - debug ResponseInvalidError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - return false - of 500: - debug ResponseInternalError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - of 503: - debug ResponseNoSyncError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.NotSynced - else: - debug ResponseUnexpectedError, response_code = response.status, - endpoint = node, - response_error = response.getErrorMessage() - RestBeaconNodeStatus.Offline - - raise newException(ValidatorApiError, ErrorMessage) + debug "Sync committee subnets preparation failed", + status = response.status, endpoint = apiResponse.node, + message = response.getErrorMessage() + return count proc getValidatorsActivity*( vc: ValidatorClientRef, epoch: Epoch, @@ -2089,7 +1994,8 @@ proc prepareBeaconProposer*( inc(count) else: debug "Beacon proposer preparation failed", status = response.status, - endpoint = apiResponse.node + endpoint = apiResponse.node, + message = response.getErrorMessage() return count proc registerValidator*( @@ -2129,5 +2035,6 @@ proc registerValidator*( inc(count) else: debug "Unable to register validators with beacon node", - status = response.status, endpoint = apiResponse.node + status = response.status, endpoint = apiResponse.node, + message = response.getErrorMessage() return count diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index ec267a5e8..578c9a7ed 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -425,10 +425,11 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} = res if len(subscriptions) > 0: - let res = await vc.prepareBeaconCommitteeSubnet(subscriptions, - ApiStrategyKind.First) - if not(res): - error "Failed to subscribe validators" + let res = await vc.prepareBeaconCommitteeSubnet(subscriptions) + if res == 0: + error "Failed to subscribe validators to beacon committee subnets", + slot = currentSlot, epoch = currentEpoch, + subscriptions_count = len(subscriptions) vc.pruneAttesterDuties(currentEpoch) @@ -470,10 +471,11 @@ proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} = res.add(sub) res if len(subscriptions) > 0: - let res = await vc.prepareSyncCommitteeSubnets(subscriptions, - ApiStrategyKind.First) - if not(res): - error "Failed to subscribe validators" + let res = await vc.prepareSyncCommitteeSubnets(subscriptions) + if res != 0: + error "Failed to subscribe validators to sync committee subnets", + slot = currentSlot, epoch = currentEpoch, + subscriptions_count = len(subscriptions) vc.pruneSyncCommitteeDuties(currentSlot) diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 945503ae2..d7db74f73 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -400,8 +400,8 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} = proc init*(t: typedesc[SyncCommitteeServiceRef], vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} = logScope: service = ServiceName - let res = SyncCommitteeServiceRef(name: ServiceName, - client: vc, state: ServiceState.Initialized) + let res = SyncCommitteeServiceRef(name: ServiceName, client: vc, + state: ServiceState.Initialized) debug "Initializing service" return res