diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index a67913aeb..32516bfb9 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -595,13 +595,15 @@ OK: 24/24 Fail: 0/24 Skip: 0/24 OK: 1/1 Fail: 0/1 Skip: 0/1 ## Validator Client test suite ```diff ++ /eth/v1/validator/beacon_committee_selections serialization/deserialization test OK ++ /eth/v1/validator/sync_committee_selections serialization/deserialization test OK + bestSuccess() API timeout test OK + firstSuccessParallel() API timeout test OK + getAttestationDataScore() test vectors OK + getLiveness() response deserialization test OK + normalizeUri() test vectors OK ``` -OK: 5/5 Fail: 0/5 Skip: 0/5 +OK: 7/7 Fail: 0/7 Skip: 0/7 ## Validator change pool testing suite ```diff + addValidatorChangeMessage/getAttesterSlashingMessage OK @@ -714,4 +716,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 9/9 Fail: 0/9 Skip: 0/9 ---TOTAL--- -OK: 403/408 Fail: 0/408 Skip: 5/408 +OK: 405/410 Fail: 0/410 Skip: 5/410 diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 274c443d1..450666d31 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -1027,6 +1027,11 @@ type defaultValue: false name: "payload-builder" .}: bool + distributedEnabled* {. + desc: "Enable usage of Obol middleware (BETA)" + defaultValue: false + name: "distributed".}: bool + beaconNodes* {. desc: "URL addresses to one or more beacon node HTTP REST APIs", defaultValue: @[defaultBeaconNodeUri] diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index b95625a7a..11e5070fc 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -20,10 +20,6 @@ export results, eth2_rest_serialization, blockchain_dag, presto, rest_types, rest_constants, rest_common -type - ValidatorIndexError* {.pure.} = enum - UnsupportedValue, TooHighValue - func match(data: openArray[char], charset: set[char]): int = for ch in data: if ch notin charset: @@ -216,26 +212,6 @@ template strData*(body: ContentBody): string = bind fromBytes string.fromBytes(body.data) -func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex, - ValidatorIndexError] = - when sizeof(ValidatorIndex) == 4: - if uint64(value) < VALIDATOR_REGISTRY_LIMIT: - # On x86 platform Nim allows only `int32` indexes, so all the indexes in - # range `2^31 <= x < 2^32` are not supported. - if uint64(value) <= uint64(high(int32)): - ok(ValidatorIndex(value)) - else: - err(ValidatorIndexError.UnsupportedValue) - else: - err(ValidatorIndexError.TooHighValue) - elif sizeof(ValidatorIndex) == 8: - if uint64(value) < VALIDATOR_REGISTRY_LIMIT: - ok(ValidatorIndex(value)) - else: - err(ValidatorIndexError.TooHighValue) - else: - doAssert(false, "ValidatorIndex type size is incorrect") - func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState, epoch: Epoch ): Result[seq[ValidatorPubKey], cstring] = diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index 546bb2d38..42b030d80 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -122,7 +122,9 @@ type seq[RestSyncCommitteeSubscription] | seq[SignedAggregateAndProof] | seq[SignedValidatorRegistrationV1] | - seq[ValidatorIndex] + seq[ValidatorIndex] | + seq[RestBeaconCommitteeSelection] | + seq[RestSyncCommitteeSelection] DecodeTypes* = DataEnclosedObject | diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index b442a495c..46acb0b3e 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -63,6 +63,9 @@ type ValidatorQueryKind* {.pure.} = enum Index, Key + ValidatorIndexError* {.pure.} = enum + UnsupportedValue, TooHighValue + ValidatorIdent* = object case kind*: ValidatorQueryKind of ValidatorQueryKind.Index: @@ -515,6 +518,17 @@ type timestamp3*: uint64 delay*: uint64 + RestBeaconCommitteeSelection* = object + validator_index*: RestValidatorIndex + slot*: Slot + selection_proof*: ValidatorSig + + RestSyncCommitteeSelection* = object + validator_index*: RestValidatorIndex + slot*: Slot + subcommittee_index*: uint64 + selection_proof*: ValidatorSig + # Types based on the OAPI yaml file - used in responses to requests GetBeaconHeadResponse* = DataEnclosedObject[Slot] GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] @@ -560,6 +574,8 @@ type SubmitBlindedBlockResponseDeneb* = DataEnclosedObject[deneb_mev.ExecutionPayloadAndBlobsBundle] GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]] GetValidatorsLivenessResponse* = DataEnclosedObject[seq[RestLivenessItem]] + SubmitBeaconCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestBeaconCommitteeSelection]] + SubmitSyncCommitteeSelectionsResponse* = DataEnclosedObject[seq[RestSyncCommitteeSelection]] RestNodeValidity* {.pure.} = enum valid = "VALID", @@ -904,3 +920,23 @@ func init*(t: typedesc[RestErrorMessage], code: HttpCode, message: string, stacktrace: openArray[string]): RestErrorMessage = RestErrorMessage(code: code.toInt(), message: message, stacktraces: Opt.some(@stacktrace)) + +func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex, + ValidatorIndexError] = + when sizeof(ValidatorIndex) == 4: + if uint64(value) < VALIDATOR_REGISTRY_LIMIT: + # On x86 platform Nim allows only `int32` indexes, so all the indexes in + # range `2^31 <= x < 2^32` are not supported. + if uint64(value) <= uint64(high(int32)): + ok(ValidatorIndex(value)) + else: + err(ValidatorIndexError.UnsupportedValue) + else: + err(ValidatorIndexError.TooHighValue) + elif sizeof(ValidatorIndex) == 8: + if uint64(value) < VALIDATOR_REGISTRY_LIMIT: + ok(ValidatorIndex(value)) + else: + err(ValidatorIndexError.TooHighValue) + else: + doAssert(false, "ValidatorIndex type size is incorrect") diff --git a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim index 3fc7a0ea8..38f914d3a 100644 --- a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim @@ -167,4 +167,18 @@ proc getValidatorsLiveness*(epoch: Epoch, ): RestPlainResponse {. rest, endpoint: "/eth/v1/validator/liveness/{epoch}", meth: MethodPost.} - ## https://ethereum.github.io/beacon-APIs/#/Validator/getLiveness + +proc submitBeaconCommitteeSelectionsPlain*( + body: seq[RestBeaconCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/beacon_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections + +proc submitSyncCommitteeSelectionsPlain*( + body: seq[RestSyncCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/sync_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitSyncCommitteeSelections + diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 9304a73aa..359f5de01 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -22,6 +22,8 @@ const ResponseNoSyncError = "Received nosync error response" ResponseDecodeError = "Received response could not be decoded" ResponseECNotInSyncError* = "Execution client not in sync" + ResponseNotImplementedError = + "Received endpoint not implemented error response" type ApiResponse*[T] = Result[T, string] @@ -772,6 +774,12 @@ template handle500(): untyped {.dirty.} = node.updateStatus(RestBeaconNodeStatus.InternalError, failure) failures.add(failure) +template handle501(): untyped {.dirty.} = + let failure = ApiNodeFailure.init(ApiFailure.NotImplemented, RequestName, + strategy, node, response.status, response.getErrorMessage()) + node.updateStatus(RestBeaconNodeStatus.Incompatible, failure) + failures.add(failure) + template handle503(): untyped {.dirty.} = let failure = ApiNodeFailure.init(ApiFailure.NotSynced, RequestName, strategy, node, response.status, response.getErrorMessage()) @@ -2659,3 +2667,195 @@ proc getFinalizedBlockHeader*( return Opt.some(oldestBlockHeader) else: return Opt.none(GetBlockHeaderResponse) + +proc submitBeaconCommitteeSelections*( + vc: ValidatorClientRef, + data: seq[RestBeaconCommitteeSelection], + strategy: ApiStrategyKind + ): Future[SubmitBeaconCommitteeSelectionsResponse] {.async.} = + const + RequestName = "submitBeaconCommitteeSelections" + + var failures: seq[ApiNodeFailure] + + case strategy + of ApiStrategyKind.First, ApiStrategyKind.Best: + let res = vc.firstSuccessParallel( + RestPlainResponse, + SubmitBeaconCommitteeSelectionsResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitBeaconCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + handleCommunicationError() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + apiResponse.error) + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isErr(): + handleUnexpectedData() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err($res.error) + else: + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].ok(res.get()) + of 400: + handle400() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseInvalidError) + of 500: + handle500() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseInternalError) + of 501: + handle501() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseNotImplementedError) + of 503: + handle503() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseNoSyncError) + else: + handleUnexpectedCode() + ApiResponse[SubmitBeaconCommitteeSelectionsResponse].err( + ResponseUnexpectedError) + + if res.isErr(): + raise (ref ValidatorApiError)(msg: res.error, data: failures) + return res.get() + + of ApiStrategyKind.Priority: + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitBeaconCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + handleCommunicationError() + false + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isOk(): return res.get() + handleUnexpectedData() + false + of 400: + handle400() + false + of 500: + handle500() + false + of 501: + handle501() + false + of 503: + handle503() + false + else: + handleUnexpectedCode() + false + + raise (ref ValidatorApiError)( + msg: "Failed to submit beacon committee selections", data: failures) + +proc submitSyncCommitteeSelections*( + vc: ValidatorClientRef, + data: seq[RestSyncCommitteeSelection], + strategy: ApiStrategyKind + ): Future[SubmitSyncCommitteeSelectionsResponse] {.async.} = + const + RequestName = "submitBeaconCommitteeSelections" + + var failures: seq[ApiNodeFailure] + + case strategy + of ApiStrategyKind.First, ApiStrategyKind.Best: + let res = vc.firstSuccessParallel( + RestPlainResponse, + SubmitSyncCommitteeSelectionsResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitSyncCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + handleCommunicationError() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + apiResponse.error) + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isErr(): + handleUnexpectedData() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err($res.error) + else: + ApiResponse[SubmitSyncCommitteeSelectionsResponse].ok(res.get()) + of 400: + handle400() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseInvalidError) + of 500: + handle500() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseInternalError) + of 501: + handle501() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseNotImplementedError) + of 503: + handle503() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseNoSyncError) + else: + handleUnexpectedCode() + ApiResponse[SubmitSyncCommitteeSelectionsResponse].err( + ResponseUnexpectedError) + + if res.isErr(): + raise (ref ValidatorApiError)(msg: res.error, data: failures) + return res.get() + + of ApiStrategyKind.Priority: + vc.firstSuccessSequential(RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + submitSyncCommitteeSelectionsPlain(it, data)): + if apiResponse.isErr(): + handleCommunicationError() + false + else: + let response = apiResponse.get() + case response.status + of 200: + let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse, + response.data, response.contentType) + if res.isOk(): return res.get() + handleUnexpectedData() + false + of 400: + handle400() + false + of 500: + handle500() + false + of 501: + handle501() + false + of 503: + handle503() + false + else: + handleUnexpectedCode() + false + + raise (ref ValidatorApiError)( + msg: "Failed to submit sync committee selections", data: failures) diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 2d6dd0767..b07db6149 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -239,7 +239,7 @@ type ApiFailure* {.pure.} = enum Communication, Invalid, NotFound, OptSynced, NotSynced, Internal, - UnexpectedCode, UnexpectedResponse, NoError + NotImplemented, UnexpectedCode, UnexpectedResponse, NoError ApiNodeFailure* = object node*: BeaconNodeServerRef @@ -255,22 +255,6 @@ type ValidatorApiError* = object of ValidatorClientError data*: seq[ApiNodeFailure] - FillSignaturesResult* = object - signaturesRequested*: int - signaturesReceived*: int - - AttestationSlotRequest* = object - validator*: AttachedValidator - fork*: Fork - slot*: Slot - - SyncCommitteeSlotRequest* = object - validator*: AttachedValidator - fork*: Fork - slot*: Slot - sync_committee_index*: IndexInSyncCommittee - duty*: SyncCommitteeDuty - const DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH) DefaultSyncCommitteeDuty* = SyncCommitteeDuty() @@ -386,6 +370,7 @@ proc `$`*(failure: ApiFailure): string = of ApiFailure.NotSynced: "not-synced" of ApiFailure.OptSynced: "opt-synced" of ApiFailure.Internal: "internal-issue" + of ApiFailure.NotImplemented: "not-implemented" of ApiFailure.UnexpectedCode: "unexpected-code" of ApiFailure.UnexpectedResponse: "unexpected-data" of ApiFailure.NoError: "status-update" @@ -1465,240 +1450,6 @@ func `==`*(a, b: SyncCommitteeDuty): bool = compareUnsorted(a.validator_sync_committee_indices, b.validator_sync_committee_indices) -proc cmp(x, y: AttestationSlotRequest|SyncCommitteeSlotRequest): int = - cmp(x.slot, y.slot) - -func getIndex*(proof: SyncCommitteeSelectionProof, - inindex: IndexInSyncCommittee): Opt[int] = - if len(proof) == 0: - return Opt.none(int) - for index, value in proof.pairs(): - if value.sync_committee_index == inindex: - return Opt.some(index) - Opt.none(int) - -func hasSignature*(proof: SyncCommitteeSelectionProof, - inindex: IndexInSyncCommittee, - slot: Slot): bool = - let index = proof.getIndex(inindex).valueOr: return false - proof[index].signatures[int(slot.since_epoch_start())].isSome() - -proc setSignature*(proof: var SyncCommitteeSelectionProof, - inindex: IndexInSyncCommittee, slot: Slot, - signature: Opt[ValidatorSig]) = - let index = proof.getIndex(inindex).expect( - "EpochSelectionProof should be present at this moment") - proof[index].signatures[int(slot.since_epoch_start())] = signature - -proc setSyncSelectionProof*(vc: ValidatorClientRef, pubkey: ValidatorPubKey, - inindex: IndexInSyncCommittee, slot: Slot, - duty: SyncCommitteeDuty, - signature: Opt[ValidatorSig]) = - let - proof = - block: - let length = len(duty.validator_sync_committee_indices) - var res = newSeq[EpochSelectionProof](length) - for i in 0 ..< length: - res[i].sync_committee_index = duty.validator_sync_committee_indices[i] - res - - vc.syncCommitteeProofs. - mgetOrPut(slot.epoch(), default(SyncCommitteeProofs)).proofs. - mgetOrPut(pubkey, proof).setSignature(inindex, slot, signature) - -proc getSyncCommitteeSelectionProof*( - vc: ValidatorClientRef, - pubkey: ValidatorPubKey, - epoch: Epoch - ): Opt[SyncCommitteeSelectionProof] = - vc.syncCommitteeProofs.withValue(epoch, epochProofs): - epochProofs[].proofs.withValue(pubkey, validatorProofs): - return Opt.some(validatorProofs[]) - do: - return Opt.none(SyncCommitteeSelectionProof) - do: - return Opt.none(SyncCommitteeSelectionProof) - -proc getSyncCommitteeSelectionProof*( - vc: ValidatorClientRef, - pubkey: ValidatorPubKey, - slot: Slot, - inindex: IndexInSyncCommittee - ): Opt[ValidatorSig] = - vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs): - epochProofs[].proofs.withValue(pubkey, validatorProofs): - let index = getIndex(validatorProofs[], inindex).valueOr: - return Opt.none(ValidatorSig) - return validatorProofs[][index].signatures[int(slot.since_epoch_start())] - do: - return Opt.none(ValidatorSig) - do: - return Opt.none(ValidatorSig) - -proc fillSyncCommitteeSelectionProofs*( - service: DutiesServiceRef, - start, finish: Slot - ): Future[FillSignaturesResult] {.async.} = - let - vc = service.client - genesisRoot = vc.beaconGenesis.genesis_validators_root - var - requests = - block: - var res: seq[SyncCommitteeSlotRequest] - for epoch in start.epoch() .. finish.epoch(): - let - fork = vc.forkAtEpoch(epoch) - period = epoch.sync_committee_period() - for duty in vc.syncDutiesForPeriod(period): - let validator = vc.attachedValidators[]. - getValidator(duty.pubkey).valueOr: - # Ignore all the validators which are not here anymore - continue - if validator.index.isNone(): - # Ignore all the valididators which do not have index yet. - continue - let proof = vc.getSyncCommitteeSelectionProof(duty.pubkey, epoch). - get(default(SyncCommitteeSelectionProof)) - for inindex in duty.validator_sync_committee_indices: - for slot in epoch.slots(): - if slot < start: continue - if slot > finish: break - if not(proof.hasSignature(inindex, slot)): - res.add( - SyncCommitteeSlotRequest( - validator: validator, - fork: fork, - slot: slot, - duty: duty, - sync_committee_index: inindex)) - # We make requests sorted by slot number. - sorted(res, cmp, order = SortOrder.Ascending) - sigres = FillSignaturesResult(signaturesRequested: len(requests)) - pendingRequests = requests.mapIt( - FutureBase(getSyncCommitteeSelectionProof( - it.validator, it.fork, genesisRoot, it.slot, - getSubcommitteeIndex(it.sync_committee_index)))) - - while len(pendingRequests) > 0: - try: - discard await race(pendingRequests) - except CancelledError as exc: - let pending = pendingRequests - .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) - await noCancel allFutures(pending) - raise exc - - (requests, pendingRequests) = - block: - var - res1: seq[SyncCommitteeSlotRequest] - res2: seq[FutureBase] - for index, fut in pendingRequests.pairs(): - if not(fut.finished()): - res1.add(requests[index]) - res2.add(fut) - else: - let - request = requests[index] - signature = - if fut.completed(): - let sres = Future[SignatureResult](fut).read() - if sres.isErr(): - warn "Unable to create slot signature using remote signer", - reason = sres.error(), epoch = request.slot.epoch(), - slot = request.slot - Opt.none(ValidatorSig) - else: - inc(sigres.signaturesReceived) - Opt.some(sres.get()) - else: - Opt.none(ValidatorSig) - vc.setSyncSelectionProof(request.validator.pubkey, - request.sync_committee_index, - request.slot, request.duty, - signature) - (res1, res2) - sigres - -proc fillAttestationSelectionProofs*( - service: DutiesServiceRef, - start, finish: Slot - ): Future[FillSignaturesResult] {.async.} = - let - vc = service.client - genesisRoot = vc.beaconGenesis.genesis_validators_root - var - requests = - block: - var res: seq[AttestationSlotRequest] - for epoch in start.epoch() .. finish.epoch(): - for duty in vc.attesterDutiesForEpoch(epoch): - if (duty.data.slot < start) or (duty.data.slot > finish): - # Ignore all the slots which are not in range. - continue - if duty.slotSig.isSome(): - # Ignore all the duties which already has selection proof. - continue - let validator = vc.attachedValidators[]. - getValidator(duty.data.pubkey).valueOr: - # Ignore all the validators which are not here anymore - continue - if validator.index.isNone(): - # Ignore all the valididators which do not have index yet. - continue - res.add(AttestationSlotRequest( - validator: validator, - slot: duty.data.slot, - fork: vc.forkAtEpoch(duty.data.slot.epoch()) - )) - # We make requests sorted by slot number. - sorted(res, cmp, order = SortOrder.Ascending) - sigres = FillSignaturesResult(signaturesRequested: len(requests)) - pendingRequests = requests.mapIt( - FutureBase(getSlotSignature(it.validator, it.fork, genesisRoot, it.slot))) - - while len(pendingRequests) > 0: - try: - discard await race(pendingRequests) - except CancelledError as exc: - let pending = pendingRequests - .filterIt(not(it.finished())).mapIt(it.cancelAndWait()) - await noCancel allFutures(pending) - raise exc - - (requests, pendingRequests) = - block: - var - res1: seq[AttestationSlotRequest] - res2: seq[FutureBase] - for index, fut in pendingRequests.pairs(): - if not(fut.finished()): - res1.add(requests[index]) - res2.add(fut) - else: - let - request = requests[index] - signature = - if fut.completed(): - let sres = Future[SignatureResult](fut).read() - if sres.isErr(): - warn "Unable to create slot signature using remote signer", - reason = sres.error(), epoch = request.slot.epoch(), - slot = request.slot - Opt.none(ValidatorSig) - else: - inc(sigres.signaturesReceived) - Opt.some(sres.get()) - else: - Opt.none(ValidatorSig) - vc.attesters.withValue(request.validator.pubkey, map): - map[].duties.withValue(request.slot.epoch(), dap): - dap[].slotSig = signature - (res1, res2) - sigres - proc updateRuntimeConfig*(vc: ValidatorClientRef, node: BeaconNodeServerRef, info: VCRuntimeConfig): Result[void, string] = diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 561f1bb8a..de98baa16 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -6,8 +6,8 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import std/[sets, sequtils] -import chronicles -import common, api, block_service +import chronicles, metrics +import common, api, block_service, selection_proofs const ServiceName = "duties_service" @@ -314,12 +314,23 @@ proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} = block: let moment = Moment.now() - sigres = await service.fillAttestationSelectionProofs( - currentSlot, currentSlot + Epoch(1)) - debug "Attestation selection proofs have been received", - signatures_requested = sigres.signaturesRequested, - signatures_received = sigres.signaturesReceived, - time = (Moment.now() - moment) + sigres = + await vc.fillAttestationSelectionProofs(currentSlot, + currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS)) + + if vc.config.distributedEnabled: + debug "Attestation selection proofs have been received", + signatures_requested = sigres.signaturesRequested, + signatures_received = sigres.signaturesReceived, + selections_requested = sigres.selections_requested, + selections_received = sigres.selections_received, + selections_processed = sigres.selections_processed, + total_elapsed_time = (Moment.now() - moment) + else: + debug "Attestation selection proofs have been received", + signatures_requested = sigres.signaturesRequested, + signatures_received = sigres.signaturesReceived, + total_elapsed_time = (Moment.now() - moment) let subscriptions = block: @@ -393,12 +404,23 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} = block: let moment = Moment.now() - sigres = await service.fillSyncCommitteeSelectionProofs( - currentSlot, currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS)) - debug "Sync committee selection proofs have been received", - signatures_requested = sigres.signaturesRequested, - signatures_received = sigres.signaturesReceived, - time = (Moment.now() - moment) + sigres = + await vc.fillSyncCommitteeSelectionProofs(currentSlot, + currentSlot + Epoch(AGGREGATION_PRE_COMPUTE_EPOCHS)) + + if vc.config.distributedEnabled: + debug "Sync committee selection proofs have been received", + signatures_requested = sigres.signaturesRequested, + signatures_received = sigres.signaturesReceived, + selections_requested = sigres.selections_requested, + selections_received = sigres.selections_received, + selections_processed = sigres.selections_processed, + total_elapsed_time = (Moment.now() - moment) + else: + debug "Sync committee selection proofs have been received", + signatures_requested = sigres.signaturesRequested, + signatures_received = sigres.signaturesReceived, + total_elapsed_time = (Moment.now() - moment) let periods = diff --git a/beacon_chain/validator_client/selection_proofs.nim b/beacon_chain/validator_client/selection_proofs.nim new file mode 100644 index 000000000..9018962e6 --- /dev/null +++ b/beacon_chain/validator_client/selection_proofs.nim @@ -0,0 +1,514 @@ +# beacon_chain +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import std/[algorithm, sequtils] +import chronicles, chronos, metrics +import common, api + +{.push raises: [].} + +declareGauge client_slot_signatures_time, + "Time used to obtain slot signatures" + +declareGauge client_sync_committee_selection_proof_time, + "Time used to obtain sync committee selection proofs" + +declareGauge client_obol_aggregated_slot_signatures_time, + "Time used to obtain slot signatures" + +declareGauge client_obol_aggregated_sync_committee_selection_proof_time, + "Time used to obtain sync committee selection proofs" + +type + FillSignaturesResult* = object + signaturesRequested*: int + signaturesReceived*: int + selectionsRequested*: int + selectionsReceived*: int + selectionsProcessed*: int + + AttestationSlotRequest = object + validator: AttachedValidator + fork: Fork + slot: Slot + proof: Opt[ValidatorSig] + future: FutureBase + + SyncCommitteeSlotRequest* = object + validator: AttachedValidator + fork: Fork + slot: Slot + sync_committee_index: IndexInSyncCommittee + sub_committee_index: SyncSubcommitteeIndex + duty: SyncCommitteeDuty + proof: Opt[ValidatorSig] + future: FutureBase + +template withTimeMetric(metricName, body: untyped): untyped = + let momentTime = Moment.now() + try: + body + finally: + let elapsedTime = Moment.now() - momentTime + metrics.set(metricName, elapsedTime.milliseconds()) + +proc cmp(x, y: AttestationSlotRequest|SyncCommitteeSlotRequest): int = + cmp(x.slot, y.slot) + +proc getAttesterDutiesRequests( + vc: ValidatorClientRef, + start, finish: Slot, + genesisRoot: Eth2Digest + ): seq[AttestationSlotRequest] = + var res: seq[AttestationSlotRequest] + for epoch in start.epoch() .. finish.epoch(): + for duty in vc.attesterDutiesForEpoch(epoch): + if (duty.data.slot < start) or (duty.data.slot > finish): + # Ignore all the slots which are not in range. + continue + if duty.slotSig.isSome(): + # Ignore all the duties which already has selection proof. + continue + let validator = vc.attachedValidators[]. + getValidator(duty.data.pubkey).valueOr: + # Ignore all the validators which are not here anymore + continue + if validator.index.isNone(): + # Ignore all the validators which do not have index yet. + continue + + let + fork = vc.forkAtEpoch(duty.data.slot.epoch()) + future = getSlotSignature(validator, fork, genesisRoot, duty.data.slot) + + res.add( + AttestationSlotRequest(validator: validator, slot: duty.data.slot, + fork: fork, future: FutureBase(future))) + # We make requests sorted by slot number. + sorted(res, cmp, order = SortOrder.Ascending) + +proc fillAttestationSelectionProofs*( + vc: ValidatorClientRef, + start, finish: Slot + ): Future[FillSignaturesResult] {.async.} = + let genesisRoot = vc.beaconGenesis.genesis_validators_root + var + requests: seq[AttestationSlotRequest] + sigres: FillSignaturesResult + + withTimeMetric(client_slot_signatures_time): + requests = vc.getAttesterDutiesRequests(start, finish, genesisRoot) + sigres.signaturesRequested = len(requests) + var pendingRequests = requests.mapIt(it.future) + + while len(pendingRequests) > 0: + try: + discard await race(pendingRequests) + except CancelledError as exc: + var pending: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): pending.add(future.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + pendingRequests = + block: + var res: seq[FutureBase] + for mreq in requests.mitems(): + if isNil(mreq.future): continue + if not(mreq.future.finished()): + res.add(mreq.future) + else: + let signature = + if mreq.future.completed(): + let sres = Future[SignatureResult](mreq.future).read() + if sres.isErr(): + warn "Unable to create slot signature using remote signer", + reason = sres.error(), epoch = mreq.slot.epoch(), + slot = mreq.slot + Opt.none(ValidatorSig) + else: + inc(sigres.signaturesReceived) + Opt.some(sres.get()) + else: + Opt.none(ValidatorSig) + + mreq.future = nil + mreq.proof = signature + + if signature.isSome(): + vc.attesters.withValue(mreq.validator.pubkey, map): + map[].duties.withValue(mreq.slot.epoch(), dap): + dap[].slotSig = signature + res + + if vc.config.distributedEnabled: + withTimeMetric(client_obol_aggregated_slot_signatures_time): + let (indexToKey, selections) = + block: + var + res1: Table[ValidatorIndex, Opt[ValidatorPubKey]] + res2: seq[RestBeaconCommitteeSelection] + + for mreq in requests.mitems(): + if mreq.proof.isSome(): + res1[mreq.validator.index.get()] = Opt.some(mreq.validator.pubkey) + res2.add(RestBeaconCommitteeSelection( + validator_index: RestValidatorIndex(mreq.validator.index.get()), + slot: mreq.slot, selection_proof: mreq.proof.get())) + (res1, res2) + + sigres.selectionsRequested = len(selections) + + if len(selections) == 0: + return sigres + + let sresponse = + try: + # Query middleware for aggregated signatures. + await vc.submitBeaconCommitteeSelections(selections, + ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to submit beacon committee selections", + reason = exc.getFailureReason() + return sigres + except CancelledError as exc: + debug "Beacon committee selections processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while trying to submit beacon " & + "committee selections", reason = exc.msg, error = exc.name + return sigres + + sigres.selectionsReceived = len(sresponse.data) + + for selection in sresponse.data: + let + vindex = selection.validator_index.toValidatorIndex().valueOr: + warn "Invalid validator_index value encountered while processing " & + "beacon committee selections", + validator_index = uint64(selection.validator_index), + reason = $error + continue + selectionProof = selection.selection_proof.load().valueOr: + warn "Invalid signature encountered while processing " & + "beacon committee selections", + validator_index = vindex, slot = selection.slot, + selection_proof = shortLog(selection.selection_proof) + continue + validator = + block: + # Selections operating using validator indices, so we should check + # if we have such validator index in our validator's pool and it + # still in place (not removed using keystore manager). + let key = indexToKey.getOrDefault(vindex) + if key.isNone(): + warn "Non-existing validator encountered while processing " & + "beacon committee selections", + validator_index = vindex, + slot = selection.slot, + selection_proof = shortLog(selection.selection_proof) + continue + vc.attachedValidators[].getValidator(key.get()).valueOr: + notice "Found missing validator while processing " & + "beacon committee selections", validator_index = vindex, + slot = selection.slot, + validator = shortLog(key.get()), + selection_proof = shortLog(selection.selection_proof) + continue + + vc.attesters.withValue(validator.pubkey, map): + map[].duties.withValue(selection.slot.epoch(), dap): + dap[].slotSig = Opt.some(selectionProof.toValidatorSig()) + inc(sigres.selectionsProcessed) + + sigres + +func getIndex*(proof: SyncCommitteeSelectionProof, + inindex: IndexInSyncCommittee): Opt[int] = + if len(proof) == 0: + return Opt.none(int) + for index, value in proof.pairs(): + if value.sync_committee_index == inindex: + return Opt.some(index) + Opt.none(int) + +func hasSignature*(proof: SyncCommitteeSelectionProof, + inindex: IndexInSyncCommittee, + slot: Slot): bool = + let index = proof.getIndex(inindex).valueOr: return false + proof[index].signatures[int(slot.since_epoch_start())].isSome() + +func getSignature*(proof: SyncCommitteeSelectionProof, + inindex: IndexInSyncCommittee, + slot: Slot): Opt[ValidatorSig] = + let index = proof.getIndex(inindex).valueOr: + return Opt.none(ValidatorSig) + proof[index].signatures[int(slot.since_epoch_start())] + +proc setSignature*(proof: var SyncCommitteeSelectionProof, + inindex: IndexInSyncCommittee, slot: Slot, + signature: Opt[ValidatorSig]) = + let index = proof.getIndex(inindex).expect( + "EpochSelectionProof should be present at this moment") + proof[index].signatures[int(slot.since_epoch_start())] = signature + +proc setSyncSelectionProof*(vc: ValidatorClientRef, pubkey: ValidatorPubKey, + inindex: IndexInSyncCommittee, slot: Slot, + duty: SyncCommitteeDuty, + signature: Opt[ValidatorSig]) = + let + proof = + block: + let length = len(duty.validator_sync_committee_indices) + var res = newSeq[EpochSelectionProof](length) + for i in 0 ..< length: + res[i].sync_committee_index = duty.validator_sync_committee_indices[i] + res + + vc.syncCommitteeProofs. + mgetOrPut(slot.epoch(), default(SyncCommitteeProofs)).proofs. + mgetOrPut(pubkey, proof).setSignature(inindex, slot, signature) + +proc getSyncCommitteeSelectionProof*( + vc: ValidatorClientRef, + pubkey: ValidatorPubKey, + epoch: Epoch + ): Opt[SyncCommitteeSelectionProof] = + vc.syncCommitteeProofs.withValue(epoch, epochProofs): + epochProofs[].proofs.withValue(pubkey, validatorProofs): + return Opt.some(validatorProofs[]) + do: + return Opt.none(SyncCommitteeSelectionProof) + do: + return Opt.none(SyncCommitteeSelectionProof) + +proc getSyncCommitteeSelectionProof*( + vc: ValidatorClientRef, + pubkey: ValidatorPubKey, + slot: Slot, + inindex: IndexInSyncCommittee + ): Opt[ValidatorSig] = + vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs): + epochProofs[].proofs.withValue(pubkey, validatorProofs): + let index = getIndex(validatorProofs[], inindex).valueOr: + return Opt.none(ValidatorSig) + return validatorProofs[][index].signatures[int(slot.since_epoch_start())] + do: + return Opt.none(ValidatorSig) + do: + return Opt.none(ValidatorSig) + +proc getSyncCommitteeDutiesRequests*( + vc: ValidatorClientRef, + start, finish: Slot, + genesisRoot: Eth2Digest + ): seq[SyncCommitteeSlotRequest] = + var res: seq[SyncCommitteeSlotRequest] + for epoch in start.epoch() .. finish.epoch(): + let + fork = vc.forkAtEpoch(epoch) + period = epoch.sync_committee_period() + + for duty in vc.syncDutiesForPeriod(period): + let validator = vc.attachedValidators[].getValidator(duty.pubkey).valueOr: + # Ignore all the validators which are not here anymore + continue + if validator.index.isNone(): + # Ignore all the valididators which do not have index yet. + continue + + let proof = vc.getSyncCommitteeSelectionProof(duty.pubkey, epoch). + get(default(SyncCommitteeSelectionProof)) + + for inindex in duty.validator_sync_committee_indices: + for slot in epoch.slots(): + if slot < start: continue + if slot > finish: break + if proof.hasSignature(inindex, slot): continue + let + future = + getSyncCommitteeSelectionProof(validator, fork, genesisRoot, slot, + getSubcommitteeIndex(inindex)) + req = + SyncCommitteeSlotRequest( + validator: validator, + fork: fork, + slot: slot, + duty: duty, + sync_committee_index: inindex, + sub_committee_index: getSubcommitteeIndex(inindex), + future: FutureBase(future)) + res.add(req) + # We make requests sorted by slot number. + sorted(res, cmp, order = SortOrder.Ascending) + +proc getSyncRequest*( + requests: var openArray[SyncCommitteeSlotRequest], + validator: AttachedValidator, + slot: Slot, + subcommittee_index: uint64 + ): Opt[SyncCommitteeSlotRequest] = + for mreq in requests.mitems(): + if mreq.validator.pubkey == validator.pubkey and + mreq.slot == slot and + mreq.sub_committee_index == subcommittee_index: + return Opt.some(mreq) + Opt.none(SyncCommitteeSlotRequest) + +proc fillSyncCommitteeSelectionProofs*( + vc: ValidatorClientRef, + start, finish: Slot + ): Future[FillSignaturesResult] {.async.} = + let genesisRoot = vc.beaconGenesis.genesis_validators_root + var + requests: seq[SyncCommitteeSlotRequest] + sigres: FillSignaturesResult + + withTimeMetric(client_sync_committee_selection_proof_time): + requests = vc.getSyncCommitteeDutiesRequests(start, finish, genesisRoot) + sigres.signaturesRequested = len(requests) + var pendingRequests = requests.mapIt(it.future) + + while len(pendingRequests) > 0: + try: + discard await race(pendingRequests) + except CancelledError as exc: + var pending: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): pending.add(future.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + pendingRequests = + block: + var res: seq[FutureBase] + for mreq in requests.mitems(): + if isNil(mreq.future): continue + if not(mreq.future.finished()): + res.add(mreq.future) + else: + let signature = + if mreq.future.completed(): + let sres = Future[SignatureResult](mreq.future).read() + if sres.isErr(): + warn "Unable to create slot signature using remote signer", + reason = sres.error(), epoch = mreq.slot.epoch(), + slot = mreq.slot + Opt.none(ValidatorSig) + else: + inc(sigres.signaturesReceived) + Opt.some(sres.get()) + else: + Opt.none(ValidatorSig) + + mreq.future = nil + mreq.proof = signature + + if signature.isSome(): + vc.setSyncSelectionProof(mreq.validator.pubkey, + mreq.sync_committee_index, + mreq.slot, mreq.duty, + signature) + res + + if vc.config.distributedEnabled: + withTimeMetric(client_obol_aggregated_sync_committee_selection_proof_time): + let (indexToKey, selections) = + block: + var + res1: Table[ValidatorIndex, Opt[ValidatorPubKey]] + res2: seq[RestSyncCommitteeSelection] + for mreq in requests.mitems(): + if mreq.proof.isSome(): + res1[mreq.validator.index.get()] = Opt.some(mreq.validator.pubkey) + res2.add(RestSyncCommitteeSelection( + validator_index: RestValidatorIndex(mreq.validator.index.get()), + subcommittee_index: uint64(mreq.sub_committee_index), + slot: mreq.slot, selection_proof: mreq.proof.get())) + (res1, res2) + + sigres.selectionsRequested = len(selections) + + if len(selections) == 0: + return sigres + + let sresponse = + try: + # Query middleware for aggregated signatures. + await vc.submitSyncCommitteeSelections(selections, + ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to submit sync committee selections", + reason = exc.getFailureReason() + return sigres + except CancelledError as exc: + debug "Sync committee selections processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while trying to submit sync " & + "committee selections", reason = exc.msg, error = exc.name + return sigres + + sigres.selectionsReceived = len(sresponse.data) + + for selection in sresponse.data: + let + slot = selection.slot + subcommittee_index = selection.subcommittee_index + vindex = selection.validator_index.toValidatorIndex().valueOr: + warn "Invalid validator_index value encountered while processing " & + "sync committee selections", + validator_index = uint64(selection.validator_index), + reason = $error + continue + selectionProof = selection.selection_proof.load().valueOr: + warn "Invalid signature encountered while processing " & + "sync committee selections", + validator_index = vindex, slot = slot, + selection_proof = shortLog(selection.selection_proof) + continue + validator = + block: + # Selections operating using validator indices, so we should check + # if we have such validator index in our validator's pool and it + # still in place (not removed using keystore manager). + let key = indexToKey.getOrDefault(vindex) + if key.isNone(): + warn "Non-existing validator encountered while processing " & + "sync committee selections", + validator_index = vindex, + slot = slot, + selection_proof = shortLog(selection.selection_proof) + continue + vc.attachedValidators[].getValidator(key.get()).valueOr: + notice "Found missing validator while processing " & + "sync committee selections", validator_index = vindex, + slot = slot, + validator = shortLog(key.get()), + selection_proof = shortLog(selection.selection_proof) + continue + request = + block: + let res = getSyncRequest(requests, validator, slot, + subcommittee_index) + if res.isNone(): + warn "Found sync committee selection proof which was not " & + "requested", + slot = slot, subcommittee_index = subcommittee_index, + validator = shortLog(validator), + selection_proof = shortLog(selection.selection_proof) + continue + res.get() + + vc.syncCommitteeProofs.withValue(slot.epoch(), epochProofs): + epochProofs[].proofs.withValue(validator.pubkey, signatures): + signatures[].setSignature(request.sync_committee_index, + selection.slot, + Opt.some(selection.selectionProof)) + inc(sigres.selectionsProcessed) + sigres diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 3906d94fd..b83f1752c 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -11,7 +11,7 @@ import ../spec/datatypes/[phase0, altair, bellatrix], ../spec/eth2_apis/rest_types, ../validators/activity_metrics, - "."/[common, api] + "."/[common, api, selection_proofs] const ServiceName = "sync_committee_service" diff --git a/tests/test_validator_client.nim b/tests/test_validator_client.nim index 6ea80b58f..c4d0f2850 100644 --- a/tests/test_validator_client.nim +++ b/tests/test_validator_client.nim @@ -144,7 +144,163 @@ const ("", "err(Missing hostname)") ] + ObolBeaconRequestTestVector = """ +[ + { + "validator_index": "1", + "slot": "1", + "selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "slot": "2", + "validator_index": "2", + "selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "3", + "selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", + "slot": "3" + }, + { + "selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", + "validator_index": "4", + "slot": "4" + } +]""" + ObolBeaconResponseTestVector = """ +{ + "data": [ + { + "validator_index": "1", + "slot": "1", + "selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "2", + "slot": "2", + "selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "3", + "slot": "3", + "selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "4", + "slot": "4", + "selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + } + ] +}""" + ObolBeaconResponseTestVectorObject = [ + ( + validator_index: RestValidatorIndex(1), + slot: Slot(1), + selection_proof: "1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(2), + slot: Slot(2), + selection_proof: "2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(3), + slot: Slot(3), + selection_proof: "3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(4), + slot: Slot(4), + selection_proof: "4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ) + ] + ObolSyncRequestTestVector = """ +[ + { + "validator_index": "1", + "slot": "1", + "subcommittee_index": "1", + "selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "2", + "subcommittee_index": "2", + "slot": "2", + "selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "subcommittee_index": "3", + "validator_index": "3", + "slot": "3", + "selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "4", + "slot": "4", + "selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", + "subcommittee_index": "4" + } +]""" + ObolSyncResponseTestVector = """ +{ + "data": [ + { + "validator_index": "1", + "slot": "1", + "subcommittee_index": "1", + "selection_proof": "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "2", + "subcommittee_index": "2", + "slot": "2", + "selection_proof": "0x2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "subcommittee_index": "3", + "validator_index": "3", + "slot": "3", + "selection_proof": "0x3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + }, + { + "validator_index": "4", + "slot": "4", + "selection_proof": "0x4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", + "subcommittee_index": "4" + } + ] +}""" + ObolSyncResponseTestVectorObject = [ + ( + validator_index: RestValidatorIndex(1), + slot: Slot(1), + subcommittee_index: 1'u64, + selection_proof: "1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(2), + slot: Slot(2), + subcommittee_index: 2'u64, + selection_proof: "2b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(3), + slot: Slot(3), + subcommittee_index: 3'u64, + selection_proof: "3b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ), + ( + validator_index: RestValidatorIndex(4), + slot: Slot(4), + subcommittee_index: 4'u64, + selection_proof: "4b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505" + ) + ] + type + TestDecodeTypes = seq[RestBeaconCommitteeSelection] | + seq[RestSyncCommitteeSelection] + AttestationDataTuple* = tuple[ slot: uint64, index: uint64, @@ -238,12 +394,227 @@ proc createRootsSeen( res suite "Validator Client test suite": + + proc decodeBytes[T: TestDecodeTypes]( + t: typedesc[T], + value: openArray[byte], + contentType: Opt[ContentTypeData] = Opt.none(ContentTypeData) + ): RestResult[T] = + let mediaType = + if contentType.isNone(): + ApplicationJsonMediaType + else: + if isWildCard(contentType.get().mediaType): + return err("Incorrect Content-Type") + contentType.get().mediaType + + if mediaType == ApplicationJsonMediaType: + try: + ok RestJson.decode(value, T, + requireAllFields = true, + allowUnknownFields = true) + except SerializationError as exc: + err("Serialization error") + else: + err("Content-Type not supported") + + proc submitBeaconCommitteeSelectionsPlain( + body: seq[RestBeaconCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/beacon_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitBeaconCommitteeSelections + + proc submitSyncCommitteeSelectionsPlain( + body: seq[RestSyncCommitteeSelection] + ): RestPlainResponse {. + rest, endpoint: "/eth/v1/validator/sync_committee_selections", + meth: MethodPost.} + ## https://ethereum.github.io/beacon-APIs/#/Validator/submitSyncCommitteeSelections + + proc createServer(address: TransportAddress, + process: HttpProcessCallback, secure: bool): HttpServerRef = + let + socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr} + res = HttpServerRef.new(address, process, socketFlags = socketFlags) + res.get() + test "normalizeUri() test vectors": for hostname in HostNames: for vector in GoodTestVectors: let expect = vector[1] % (hostname) check $normalizeUri(parseUri(vector[0] % (hostname))) == expect + asyncTest "/eth/v1/validator/beacon_committee_selections " & + "serialization/deserialization test": + var clientRequest: seq[byte] + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + case request.uri.path + of "/eth/v1/validator/beacon_committee_selections": + clientRequest = await request.getBody() + let headers = HttpTable.init([("Content-Type", "application/json")]) + return await request.respond(Http200, ObolBeaconResponseTestVector, + headers) + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + let server = createServer(initTAddress("127.0.0.1:0"), process, false) + server.start() + defer: + await server.stop() + await server.closeWait() + + let + serverAddress = server.instance.localAddress + flags = {RestClientFlag.CommaSeparatedArray} + remoteUri = "http://" & $serverAddress + client = + block: + let res = RestClientRef.new(remoteUri, flags = flags) + check res.isOk() + res.get() + selections = + block: + let res = decodeBytes( + seq[RestBeaconCommitteeSelection], + ObolBeaconRequestTestVector.toOpenArrayByte( + 0, len(ObolBeaconRequestTestVector) - 1)) + check res.isOk() + res.get() + + defer: + await client.closeWait() + + let resp = await client.submitBeaconCommitteeSelectionsPlain(selections) + check: + resp.status == 200 + resp.contentType == MediaType.init("application/json") + + let request = + block: + let res = decodeBytes( + seq[RestBeaconCommitteeSelection], + clientRequest) + check res.isOk() + res.get() + + let response = block: + let res = decodeBytes(SubmitBeaconCommitteeSelectionsResponse, + resp.data, resp.contentType) + check res.isOk() + res.get() + + check: + len(request) == len(selections) + len(response.data) == len(ObolBeaconResponseTestVectorObject) + + # Checking response + for index, item in response.data.pairs(): + check: + item.validator_index == + ObolBeaconResponseTestVectorObject[index].validator_index + item.slot == + ObolBeaconResponseTestVectorObject[index].slot + item.selection_proof.toHex() == + ObolBeaconResponseTestVectorObject[index].selection_proof + + # Checking request + for index, item in selections.pairs(): + check: + item.validator_index == request[index].validator_index + item.slot == request[index].slot + item.selection_proof.toHex() == request[index].selection_proof.toHex() + + asyncTest "/eth/v1/validator/sync_committee_selections " & + "serialization/deserialization test": + var clientRequest: seq[byte] + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + case request.uri.path + of "/eth/v1/validator/sync_committee_selections": + clientRequest = await request.getBody() + let headers = HttpTable.init([("Content-Type", "application/json")]) + return await request.respond(Http200, ObolSyncResponseTestVector, + headers) + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + let server = createServer(initTAddress("127.0.0.1:0"), process, false) + server.start() + defer: + await server.stop() + await server.closeWait() + + let + serverAddress = server.instance.localAddress + flags = {RestClientFlag.CommaSeparatedArray} + remoteUri = "http://" & $serverAddress + client = + block: + let res = RestClientRef.new(remoteUri, flags = flags) + check res.isOk() + res.get() + selections = + block: + let res = decodeBytes( + seq[RestSyncCommitteeSelection], + ObolSyncRequestTestVector.toOpenArrayByte( + 0, len(ObolSyncRequestTestVector) - 1)) + check res.isOk() + res.get() + + defer: + await client.closeWait() + + let resp = await client.submitSyncCommitteeSelectionsPlain(selections) + check: + resp.status == 200 + resp.contentType == MediaType.init("application/json") + + let request = + block: + let res = decodeBytes( + seq[RestSyncCommitteeSelection], + clientRequest) + check res.isOk() + res.get() + + let response = block: + let res = decodeBytes(SubmitSyncCommitteeSelectionsResponse, + resp.data, resp.contentType) + check res.isOk() + res.get() + + check: + len(request) == len(selections) + len(response.data) == len(ObolSyncResponseTestVectorObject) + + # Checking response + for index, item in response.data.pairs(): + check: + item.validator_index == + ObolSyncResponseTestVectorObject[index].validator_index + item.slot == + ObolSyncResponseTestVectorObject[index].slot + item.selection_proof.toHex() == + ObolSyncResponseTestVectorObject[index].selection_proof + item.subcommittee_index == request[index].subcommittee_index + + # Checking request + for index, item in selections.pairs(): + check: + item.validator_index == request[index].validator_index + item.slot == request[index].slot + item.subcommittee_index == request[index].subcommittee_index + item.selection_proof.toHex() == request[index].selection_proof.toHex() + test "getAttestationDataScore() test vectors": for vector in AttestationDataVectors: let