diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 4349c4588..991ae58cb 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -938,6 +938,11 @@ type defaultValue: false name: "payload-builder" .}: bool + distributedEnabled* {. + desc: "Enable usage of beacon node middleware (BETA)" + defaultValue: false + name: "distributed".}: bool + beaconNodes* {. desc: "URL addresses to one or more beacon node HTTP REST APIs", defaultValue: @[defaultBeaconNodeUri] diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index 27c6bc494..77545cdd4 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -122,7 +122,9 @@ type seq[RestSyncCommitteeSubscription] | seq[SignedAggregateAndProof] | seq[SignedValidatorRegistrationV1] | - seq[ValidatorIndex] + seq[ValidatorIndex] | + seq[RestBeaconCommitteeSelection] | + seq[RestSyncCommitteeSelection] DecodeTypes* = DataEnclosedObject | diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index 219c024e7..2eac7e056 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -639,6 +639,17 @@ type RestRoot* = object root*: Eth2Digest + RestBeaconCommitteeSelection* = object + validator_index*: RestValidatorIndex + slot*: Slot + selection_proof*: ValidatorSig + + RestSyncCommitteeSelection* = object + validator_index*: RestValidatorIndex + slot*: Slot + subcommittee_index*: uint64 + selection_proof*: ValidatorSig + # Types based on the OAPI yaml file - used in responses to requests GetBeaconHeadResponse* = DataEnclosedObject[Slot] GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] @@ -684,6 +695,8 @@ type SubmitBlindedBlockResponseCapella* = DataEnclosedObject[capella.ExecutionPayload] GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]] GetValidatorsLivenessResponse* = DataEnclosedObject[seq[RestLivenessItem]] + SubmitBeaconCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestBeaconCommitteeSelection]] + SubmitSyncCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestSyncCommitteeSelection]] RestNodeValidity* {.pure.} = enum valid = "VALID", diff --git a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim index 6b3657b8e..3123dd962 100644 --- a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim @@ -185,3 +185,18 @@ proc getValidatorsLiveness*(epoch: Epoch, ): RestPlainResponse {. rest, endpoint: "/eth/v1/validator/liveness/{epoch}", meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/getLiveness + +proc submitBeaconCommitteeSelectionsPlain*( + body: seq[RestBeaconCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/beacon_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections + +proc submitSyncCommitteeSelectionsPlain*( + body: seq[RestSyncCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/sync_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitSyncCommitteeSelections diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 6c5cf8520..d4b8594bb 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -21,6 +21,8 @@ const ResponseNoSyncError = "Received nosync error response" ResponseDecodeError = "Received response could not be decoded" ResponseECNotInSyncError* = "Execution client not in sync" + ResponseNotImplementedError = + "Received endpoint not implemented error response" type ApiResponse*[T] = Result[T, string] @@ -2431,3 +2433,275 @@ proc getValidatorsLiveness*( res return GetValidatorsLivenessResponse(data: response) + +proc submitBeaconCommitteeSelections*( + vc: ValidatorClientRef, + data: seq[RestBeaconCommitteeSelection], + strategy: ApiStrategyKind + ): Future[SubmitBeaconCommitteeSelectionsResponse] {.async.} = + logScope: + request = "submitBeaconCommitteeSelections" + strategy = $strategy + + const ErrorMessage = "Unable to submit beacon committee selections" + + var failures: seq[ApiNodeFailure] + + case strategy + of ApiStrategyKind.First, ApiStrategyKind.Best: + let res = vc.firstSuccessParallel( + RestPlainResponse, + SubmitBeaconCommitteeSelectionsResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitBeaconCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + debug ErrorMessage, endpoint = node, error = apiResponse.error + node.updateStatus(RestBeaconNodeStatus.Offline) + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + apiResponse.error) + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isErr(): + node.updateStatus(RestBeaconNodeStatus.Unexpected) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err($res.error) + else: + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].ok(res.get()) + of 400: + debug ResponseInvalidError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseInvalidError) + of 500: + debug ResponseInternalError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.InternalError) + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseInternalError) + of 501: + warn ResponseNotImplementedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseNotImplementedError) + of 503: + debug ResponseNoSyncError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.NotSynced) + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseNoSyncError) + else: + debug ResponseUnexpectedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseUnexpectedError) + + if res.isErr(): + raise (ref ValidatorApiError)(msg: res.error, data: failures) + return res.get() + + of ApiStrategyKind.Priority: + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitBeaconCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + debug ErrorMessage, endpoint = node, error = apiResponse.error + node.updateStatus(RestBeaconNodeStatus.Offline) + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isOk(): + return res.get() + debug ResponseDecodeError, response_code = response.status, + endpoint = node, reason = res.error + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 400: + debug ResponseInvalidError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 500: + debug ResponseInternalError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.InternalError) + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false + of 501: + warn ResponseNotImplementedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 503: + debug ResponseNoSyncError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.NotSynced) + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false + else: + debug ResponseUnexpectedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) + +proc submitSyncCommitteeSelections*( + vc: ValidatorClientRef, + data: seq[RestSyncCommitteeSelection], + strategy: ApiStrategyKind + ): Future[SubmitSyncCommitteeSelectionsResponse] {.async.} = + logScope: + request = "submitSyncCommitteeSelections" + strategy = $strategy + + const ErrorMessage = "Unable to submit sync committee selections" + + var failures: seq[ApiNodeFailure] + + case strategy + of ApiStrategyKind.First, ApiStrategyKind.Best: + let res = vc.firstSuccessParallel( + RestPlainResponse, + SubmitSyncCommitteeSelectionsResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitSyncCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + debug ErrorMessage, endpoint = node, error = apiResponse.error + node.updateStatus(RestBeaconNodeStatus.Offline) + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + apiResponse.error) + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isErr(): + node.updateStatus(RestBeaconNodeStatus.Unexpected) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err($res.error) + else: + ApiResponse[SubmitSyncCommitteeSelectionsResponse].ok(res.get()) + of 400: + debug ResponseInvalidError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseInvalidError) + of 500: + debug ResponseInternalError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.InternalError) + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseInternalError) + of 501: + warn ResponseNotImplementedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseNotImplementedError) + of 503: + debug ResponseNoSyncError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.NotSynced) + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseNoSyncError) + else: + debug ResponseUnexpectedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseUnexpectedError) + + if res.isErr(): + raise (ref ValidatorApiError)(msg: res.error, data: failures) + return res.get() + + of ApiStrategyKind.Priority: + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitSyncCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + debug ErrorMessage, endpoint = node, error = apiResponse.error + node.updateStatus(RestBeaconNodeStatus.Offline) + failures.add(ApiNodeFailure.init(node, ApiFailure.Communication)) + false + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isOk(): + return res.get() + debug ResponseDecodeError, response_code = response.status, + endpoint = node, reason = res.error + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 400: + debug ResponseInvalidError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 500: + debug ResponseInternalError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.InternalError) + failures.add(ApiNodeFailure.init(node, ApiFailure.Internal)) + false + of 501: + warn ResponseNotImplementedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Incompatible) + failures.add(ApiNodeFailure.init(node, ApiFailure.Invalid)) + false + of 503: + debug ResponseNoSyncError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.NotSynced) + failures.add(ApiNodeFailure.init(node, ApiFailure.NotSynced)) + false + else: + debug ResponseUnexpectedError, response_code = response.status, + endpoint = node, reason = response.getErrorMessage() + node.updateStatus(RestBeaconNodeStatus.Unexpected) + failures.add(ApiNodeFailure.init(node, ApiFailure.Unexpected)) + false + + raise (ref ValidatorApiError)(msg: ErrorMessage, data: failures) diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index a5a5b4d44..9ca89ff25 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -851,6 +851,19 @@ proc isExpired*(vc: ValidatorClientRef, else: true +func getSelections*( + daps: openArray[DutyAndProof] + ): seq[RestBeaconCommitteeSelection] = + var res: seq[RestBeaconCommitteeSelection] + for item in daps: + if item.slotSig.isSome(): + res.add(RestBeaconCommitteeSelection( + validator_index: RestValidatorIndex(item.data.validator_index), + slot: item.data.slot, + selection_proof: item.slotSig.get() + )) + res + proc getValidatorRegistration( vc: ValidatorClientRef, validator: AttachedValidator, diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 3ee98d050..e8dcb2ff2 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -198,12 +198,12 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, var pendingRequests: seq[Future[SignatureResult]] var validators: seq[AttachedValidator] for item in addOrReplaceItems: - let validator = + let + validator = vc.attachedValidators[].getValidator(item.duty.pubkey).valueOr: - continue - let fork = vc.forkAtEpoch(item.duty.slot.epoch) - let future = validator.getSlotSignature( - fork, genesisRoot, item.duty.slot) + continue + fork = vc.forkAtEpoch(item.duty.slot.epoch) + future = validator.getSlotSignature(fork, genesisRoot, item.duty.slot) pendingRequests.add(future) validators.add(validator) @@ -217,27 +217,85 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, await allFutures(pendingCancel) raise exc - for index, fut in pendingRequests: - let item = addOrReplaceItems[index] - let dap = - if fut.completed(): - let sigRes = fut.read() - if sigRes.isErr(): - warn "Unable to create slot signature using remote signer", - validator = shortLog(validators[index]), - error_msg = sigRes.error() - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - Opt.none(ValidatorSig)) - else: - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - Opt.some(sigRes.get())) - else: - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - Opt.none(ValidatorSig)) + let daps = + block: + var res: seq[DutyAndProof] + for index, fut in pendingRequests: + let + item = addOrReplaceItems[index] + dap = + if fut.done(): + let sigRes = fut.read() + if sigRes.isErr(): + warn "Unable to create slot signature using remote signer", + validator = shortLog(validators[index]), + error_msg = sigRes.error() + DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, + none[ValidatorSig]()) + else: + DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, + some(sigRes.get())) + else: + DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, + none[ValidatorSig]()) + res.add(dap) + res - var validatorDuties = vc.attesters.getOrDefault(item.duty.pubkey) - validatorDuties.duties[item.epoch] = dap - vc.attesters[item.duty.pubkey] = validatorDuties + for item in daps: + # Update VC attesters registry with current version of DutyAndProof. + vc.attesters.mgetOrPut(item.data.pubkey, + default(EpochDuties)).duties[item.epoch] = item + + if vc.config.distributedEnabled: + let selections = daps.getSelections() + if len(selections) == 0: + return len(addOrReplaceItems) + + let sresponse = + try: + # Query middleware for aggregated signatures. + await vc.submitBeaconCommitteeSelections(selections, + ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to submit beacon committee selections", epoch = epoch, + reason = exc.getFailureReason() + return 0 + except CancelledError as exc: + debug "Beacon committee selections processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while trying to submit beacon " & + "committee selections", epoch = epoch, err_name = exc.name, + err_msg = exc.msg + return 0 + + for selection in sresponse.data: + let selectionProof = selection.selection_proof.load().valueOr: + warn "Invalid signature encountered while processing beacon " & + "committee selections", + validator_index = ValidatorIndex(selection.validator_index), + slot = selection.slot, + selection_proof = shortLog(selection.selection_proof) + continue + + let dres = + block: + var res: Opt[DutyAndProof] + for dap in daps: + if (uint64(dap.data.validator_index) == + uint64(selection.validator_index)) and + (dap.data.slot == selection.slot): + var ndap = dap + ndap.slotSig = some(selection.selection_proof) + res = Opt.some(ndap) + break + res + + if dres.isSome(): + # Update VC attesters registry with new aggregated DutyAndProof. + let dap = dres.get() + vc.attesters.mgetOrPut( + dap.data.pubkey, default(EpochDuties)).duties[dap.epoch] = dap return len(addOrReplaceItems) diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 671e4e7d8..c51fc31f4 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -406,7 +406,7 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, await service.produceAndPublishContributions(slot, beaconBlockRoot, duties) proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef, - slot: Slot) {.async.} = + slot: Slot) {.async.} = let vc = service.client duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)