diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index 5033af075..44cb71a2b 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -20,10 +20,6 @@ export options, eth2_rest_serialization, blockchain_dag, presto, rest_types, rest_constants -type - ValidatorIndexError* {.pure.} = enum - UnsupportedValue, TooHighValue - func match(data: openArray[char], charset: set[char]): int = for ch in data: if ch notin charset: @@ -203,26 +199,6 @@ template strData*(body: ContentBody): string = bind fromBytes string.fromBytes(body.data) -func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex, - ValidatorIndexError] = - when sizeof(ValidatorIndex) == 4: - if uint64(value) < VALIDATOR_REGISTRY_LIMIT: - # On x86 platform Nim allows only `int32` indexes, so all the indexes in - # range `2^31 <= x < 2^32` are not supported. - if uint64(value) <= uint64(high(int32)): - ok(ValidatorIndex(value)) - else: - err(ValidatorIndexError.UnsupportedValue) - else: - err(ValidatorIndexError.TooHighValue) - elif sizeof(ValidatorIndex) == 8: - if uint64(value) < VALIDATOR_REGISTRY_LIMIT: - ok(ValidatorIndex(value)) - else: - err(ValidatorIndexError.TooHighValue) - else: - doAssert(false, "ValidatorIndex type size is incorrect") - func syncCommitteeParticipants*(forkedState: ForkedHashedBeaconState, epoch: Epoch ): Result[seq[ValidatorPubKey], cstring] = diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index 2eac7e056..8246d8e02 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -68,6 +68,9 @@ type ValidatorQueryKind* {.pure.} = enum Index, Key + ValidatorIndexError* {.pure.} = enum + UnsupportedValue, TooHighValue + ValidatorIdent* = object case kind*: ValidatorQueryKind of ValidatorQueryKind.Index: @@ -1009,3 +1012,23 @@ func init*(t: typedesc[RestSignedContributionAndProof], message.contribution ), signature: signature) + +func toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex, + ValidatorIndexError] = + when sizeof(ValidatorIndex) == 4: + if uint64(value) < VALIDATOR_REGISTRY_LIMIT: + # On x86 platform Nim allows only `int32` indexes, so all the indexes in + # range `2^31 <= x < 2^32` are not supported. + if uint64(value) <= uint64(high(int32)): + ok(ValidatorIndex(value)) + else: + err(ValidatorIndexError.UnsupportedValue) + else: + err(ValidatorIndexError.TooHighValue) + elif sizeof(ValidatorIndex) == 8: + if uint64(value) < VALIDATOR_REGISTRY_LIMIT: + ok(ValidatorIndex(value)) + else: + err(ValidatorIndexError.TooHighValue) + else: + doAssert(false, "ValidatorIndex type size is incorrect") diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 9ca89ff25..f5dace362 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -6,7 +6,7 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import - std/[tables, os, sets, sequtils, strutils, uri], + std/[tables, os, sets, sequtils, strutils, uri, algorithm], stew/[base10, results, byteutils], bearssl/rand, chronos, presto, presto/client as presto_client, chronicles, confutils, @@ -60,6 +60,8 @@ type client*: ValidatorClientRef DutiesServiceRef* = ref object of ClientServiceRef + pollingAttesterDutiesTask*: Future[void] + pollingSyncDutiesTask*: Future[void] FallbackServiceRef* = ref object of ClientServiceRef changesEvent*: AsyncEvent @@ -81,10 +83,14 @@ type data*: RestAttesterDuty slotSig*: Opt[ValidatorSig] - SyncCommitteeDuty* = object - pubkey*: ValidatorPubKey - validator_index*: ValidatorIndex - validator_sync_committee_index*: IndexInSyncCommittee + SyncCommitteeDuty* = RestSyncCommitteeDuty + + SlotProofsArray* = array[SLOTS_PER_EPOCH, Opt[ValidatorSig]] + + SyncDutyAndProof* = object + epoch*: Epoch + data*: SyncCommitteeDuty + slotSigs*: Table[SyncSubcommitteeIndex, SlotProofsArray] SyncCommitteeSubscriptionInfo* = object validator_index*: ValidatorIndex @@ -122,7 +128,7 @@ type duties*: Table[Epoch, DutyAndProof] EpochSyncDuties* = object - duties*: Table[Epoch, SyncCommitteeDuty] + duties*: Table[Epoch, SyncDutyAndProof] RestBeaconNodeStatus* {.pure.} = enum Offline, ## BN is offline. @@ -211,6 +217,7 @@ type const DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH) + DefaultSyncDutyAndProof* = SyncDutyAndProof(epoch: FAR_FUTURE_EPOCH) SlotDuration* = int64(SECONDS_PER_SLOT).seconds OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT AllBeaconNodeRoles* = { @@ -394,7 +401,7 @@ chronicles.expandIt(RestAttesterDuty): chronicles.expandIt(SyncCommitteeDuty): pubkey = shortLog(it.pubkey) validator_index = it.validator_index - validator_sync_committee_index = it.validator_sync_committee_index + validator_sync_committee_indices = it.validator_sync_committee_indices proc checkConfig*(info: RestSpecVC): bool = # /!\ Keep in sync with `spec/eth2_apis/rest_types.nim` > `RestSpecVC`. @@ -500,11 +507,14 @@ proc stop*(csr: ClientServiceRef) {.async.} = csr.state = ServiceState.Closed debug "Service stopped", service = csr.name -proc isDefault*(dap: DutyAndProof): bool = - dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64) +func isDefault*(dap: DutyAndProof): bool = + dap.epoch == FAR_FUTURE_EPOCH -proc isDefault*(prd: ProposedData): bool = - prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64) +func isDefault*(prd: ProposedData): bool = + prd.epoch == FAR_FUTURE_EPOCH + +func isDefault*(sdap: SyncDutyAndProof): bool = + sdap.epoch == FAR_FUTURE_EPOCH proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] = var res: set[BeaconNodeRole] @@ -622,6 +632,10 @@ proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest, DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty, slotSig: slotSig) +proc init*(t: typedesc[SyncDutyAndProof], epoch: Epoch, + duty: SyncCommitteeDuty): SyncDutyAndProof = + SyncDutyAndProof(epoch: epoch, data: duty) + proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest, data: openArray[ProposerTask]): ProposedData = ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data) @@ -638,17 +652,16 @@ proc getAttesterDutiesForSlot*(vc: ValidatorClientRef, ## Returns all `DutyAndProof` for the given `slot`. var res: seq[DutyAndProof] let epoch = slot.epoch() - for key, item in vc.attesters: - let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof) - if not(duty.isDefault()): - if duty.data.slot == slot: - res.add(duty) + for key, item in mpairs(vc.attesters): + item.duties.withValue(epoch, duty): + if duty[].data.slot == slot: + res.add(duty[]) res proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef, - slot: Slot): seq[SyncCommitteeDuty] = - ## Returns all `SyncCommitteeDuty` for the given `slot`. - var res: seq[SyncCommitteeDuty] + slot: Slot): seq[SyncDutyAndProof] = + ## Returns all `SyncDutyAndProof` for the given `slot`. + var res: seq[SyncDutyAndProof] let epoch = slot.epoch() for key, item in mpairs(vc.syncCommitteeDuties): item.duties.withValue(epoch, duty): @@ -698,24 +711,26 @@ iterator attesterDutiesForEpoch*(vc: ValidatorClientRef, if not(isDefault(epochDuties)): yield epochDuties +iterator syncDutiesForEpoch*(vc: ValidatorClientRef, + epoch: Epoch): SyncDutyAndProof = + for key, item in vc.syncCommitteeDuties: + let epochDuties = item.duties.getOrDefault(epoch) + if not(isDefault(epochDuties)): + yield epochDuties + proc syncMembersSubscriptionInfoForEpoch*( vc: ValidatorClientRef, - epoch: Epoch): seq[SyncCommitteeSubscriptionInfo] = + epoch: Epoch + ): seq[SyncCommitteeSubscriptionInfo] = var res: seq[SyncCommitteeSubscriptionInfo] - for key, item in mpairs(vc.syncCommitteeDuties): - var cur: SyncCommitteeSubscriptionInfo - var initialized = false - - item.duties.withValue(epoch, epochDuties): - if not initialized: - cur.validator_index = epochDuties.validator_index - initialized = true - cur.validator_sync_committee_indices.add( - epochDuties.validator_sync_committee_index) - - if initialized: - res.add cur - + for mitem in mvalues(vc.syncCommitteeDuties): + mitem.duties.withValue(epoch, epochDuties): + res.add( + SyncCommitteeSubscriptionInfo( + validator_index: + epochDuties[].data.validator_index, + validator_sync_committee_indices: + epochDuties[].data.validator_sync_committee_indices)) res proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff = @@ -851,19 +866,6 @@ proc isExpired*(vc: ValidatorClientRef, else: true -func getSelections*( - daps: openArray[DutyAndProof] - ): seq[RestBeaconCommitteeSelection] = - var res: seq[RestBeaconCommitteeSelection] - for item in daps: - if item.slotSig.isSome(): - res.add(RestBeaconCommitteeSelection( - validator_index: RestValidatorIndex(item.data.validator_index), - slot: item.data.slot, - selection_proof: item.slotSig.get() - )) - res - proc getValidatorRegistration( vc: ValidatorClientRef, validator: AttachedValidator, @@ -1097,3 +1099,29 @@ proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot], nextSlot = currentSlot + 1 vc.checkedWaitForSlot(nextSlot, offset, showLogs) + +func cmpUnsorted*[T](a, b: openArray[T]): bool = + if len(a) != len(b): return false + return + case len(a) + of 0: + true + of 1: + a[0] == b[0] + of 2: + ((a[0] == b[0]) and (a[1] == b[1])) or + ((a[0] == b[1]) and (a[1] == b[0])) + else: + let asorted = sorted(a) + let bsorted = sorted(b) + for index, item in asorted.pairs(): + if item != bsorted[index]: + return false + true + +func `==`*(a, b: SyncDutyAndProof): bool = + (a.epoch == b.epoch) and + (a.data.pubkey == b.data.pubkey) and + (a.data.validator_index == b.data.validator_index) and + cmpUnsorted(a.data.validator_sync_committee_indices, + b.data.validator_sync_committee_indices) diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index e8dcb2ff2..664c8b491 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -5,7 +5,7 @@ # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. -import std/[sets, sequtils] +import std/[sets, sequtils, algorithm] import chronicles import common, api, block_service @@ -20,6 +20,22 @@ type AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, ProposerPreparationLoop, ValidatorRegisterLoop + SyncIndexTable = Table[SyncSubcommitteeIndex, SlotProofsArray] + + SyncValidatorAndDuty = object + validator: AttachedValidator + duty: SyncDutyAndProof + + AttestationSlotRequest = object + validator: AttachedValidator + slot: Slot + + SyncSlotRequest = object + validator: AttachedValidator + slot: Slot + validatorSyncCommitteeIndex: IndexInSyncCommittee + validatorSubCommitteeIndex: SyncSubcommitteeIndex + chronicles.formatIt(DutiesServiceLoop): case it of AttesterLoop: "attester_loop" @@ -29,6 +45,9 @@ chronicles.formatIt(DutiesServiceLoop): of ProposerPreparationLoop: "proposer_prepare_loop" of ValidatorRegisterLoop: "validator_register_loop" +proc cmp(x, y: AttestationSlotRequest): int = + if x.slot == y.slot: 0 elif x.slot < y.slot: -1 else: 1 + proc checkDuty(duty: RestAttesterDuty): bool = (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and (uint64(duty.committee_index) < MAX_COMMITTEES_PER_SLOT) and @@ -109,6 +128,147 @@ proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} = updated_validators = updated vc.indicesAvailable.fire() +proc fillAttestationSlotSignatures*( + service: DutiesServiceRef, + epochPeriods: seq[Epoch] + ) {.async.} = + let + vc = service.client + genesisRoot = vc.beaconGenesis.genesis_validators_root + requests = + block: + var res: seq[AttestationSlotRequest] + for epoch in epochPeriods: + for duty in vc.attesterDutiesForEpoch(epoch): + if duty.slotSig.isNone(): + let validator = + vc.attachedValidators[]. + getValidator(duty.data.pubkey).valueOr: + continue + res.add(AttestationSlotRequest( + validator: validator, + slot: duty.data.slot + )) + # We make requests sorted by slot number. + sorted(res, cmp, order = SortOrder.Ascending) + pendingRequests = + block: + var res: seq[Future[SignatureResult]] + for request in requests: + let fork = vc.forkAtEpoch(request.slot.epoch()) + res.add(request.validator.getSlotSignature(fork, genesisRoot, + request.slot)) + res + + try: + # TODO (cheatfate): Here we waiting for all signatures, but when remote + # signer is used we could try different approach. + await allFutures(pendingRequests) + except CancelledError as exc: + var pending: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): pending.add(future.cancelAndWait()) + await allFutures(pending) + raise exc + + for index, fut in pendingRequests.pairs(): + let + request = requests[index] + signature = + if fut.done(): + let sres = fut.read() + if sres.isErr(): + warn "Unable to create slot signature using remote signer", + reason = sres.error(), epoch = request.slot.epoch(), + slot = request.slot + Opt.none(ValidatorSig) + else: + Opt.some(sres.get()) + else: + Opt.none(ValidatorSig) + + vc.attesters.withValue(request.validator.pubkey, map): + map[].duties.withValue(request.slot.epoch(), dap): + dap[].slotSig = signature + + if vc.config.distributedEnabled: + var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]] + let selections = + block: + var sres: seq[RestBeaconCommitteeSelection] + for epoch in epochPeriods: + for duty in vc.attesterDutiesForEpoch(epoch): + # We only use duties which has slot signature filled, because + # middleware needs it to create aggregated signature. + if duty.slotSig.isSome(): + let + validator = vc.attachedValidators[].getValidator( + duty.data.pubkey).valueOr: + continue + vindex = validator.index.valueOr: + continue + indexToKey[vindex] = Opt.some(validator.pubkey) + sres.add(RestBeaconCommitteeSelection( + validator_index: RestValidatorIndex(vindex), + slot: duty.data.slot, + selection_proof: duty.slotSig.get() + )) + sres + + if len(selections) == 0: return + + let sresponse = + try: + # Query middleware for aggregated signatures. + await vc.submitBeaconCommitteeSelections(selections, + ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to submit beacon committee selections", + reason = exc.getFailureReason() + return + except CancelledError as exc: + debug "Beacon committee selections processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while trying to submit beacon " & + "committee selections", reason = exc.msg, error = exc.name + return + + for selection in sresponse.data: + let + vindex = selection.validator_index.toValidatorIndex().valueOr: + warn "Invalid validator_index value encountered while processing " & + "beacon committee selections", + validator_index = uint64(selection.validator_index), + reason = $error + continue + selectionProof = selection.selection_proof.load().valueOr: + warn "Invalid signature encountered while processing " & + "beacon committee selections", + validator_index = vindex, + slot = selection.slot, + selection_proof = shortLog(selection.selection_proof) + continue + validator = + block: + # Selections operating using validator indices, so we should check + # if we have such validator index in our validator's pool and it + # still in place (not removed using keystore manager). + let key = indexToKey.getOrDefault(vindex) + if key.isNone(): + warn "Non-existing validator encountered while processing " & + "beacon committee selections", + validator_index = vindex, + slot = selection.slot, + selection_proof = shortLog(selection.selection_proof) + continue + vc.attachedValidators[].getValidator(key.get()).valueOr: + continue + + vc.attesters.withValue(validator.pubkey, map): + map[].duties.withValue(selection.slot.epoch(), dap): + dap[].slotSig = Opt.some(selectionProof.toValidatorSig()) + proc pollForAttesterDuties*(service: DutiesServiceRef, epoch: Epoch): Future[int] {.async.} = let vc = service.client @@ -118,7 +278,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, return 0 var duties: seq[RestAttesterDuty] - var currentRoot: Option[Eth2Digest] + var currentRoot: Opt[Eth2Digest] var offset = 0 while offset < len(validatorIndices): @@ -151,7 +311,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, if currentRoot.isNone(): # First request - currentRoot = some(res.dependent_root) + currentRoot = Opt.some(res.dependent_root) else: if currentRoot.get() != res.dependent_root: # `dependent_root` must be equal for all requests/response, if it got @@ -159,7 +319,7 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, # should re-request all queries again. offset = 0 duties.setLen(0) - currentRoot = none[Eth2Digest]() + currentRoot = Opt.none(Eth2Digest) continue for item in res.data: @@ -194,108 +354,11 @@ proc pollForAttesterDuties*(service: DutiesServiceRef, res.add((epoch, duty)) res - if len(addOrReplaceItems) > 0: - var pendingRequests: seq[Future[SignatureResult]] - var validators: seq[AttachedValidator] - for item in addOrReplaceItems: - let - validator = - vc.attachedValidators[].getValidator(item.duty.pubkey).valueOr: - continue - fork = vc.forkAtEpoch(item.duty.slot.epoch) - future = validator.getSlotSignature(fork, genesisRoot, item.duty.slot) - pendingRequests.add(future) - validators.add(validator) - - try: - await allFutures(pendingRequests) - except CancelledError as exc: - var pendingCancel: seq[Future[void]] - for future in pendingRequests: - if not(future.finished()): - pendingCancel.add(future.cancelAndWait()) - await allFutures(pendingCancel) - raise exc - - let daps = - block: - var res: seq[DutyAndProof] - for index, fut in pendingRequests: - let - item = addOrReplaceItems[index] - dap = - if fut.done(): - let sigRes = fut.read() - if sigRes.isErr(): - warn "Unable to create slot signature using remote signer", - validator = shortLog(validators[index]), - error_msg = sigRes.error() - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - none[ValidatorSig]()) - else: - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - some(sigRes.get())) - else: - DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, - none[ValidatorSig]()) - res.add(dap) - res - - for item in daps: - # Update VC attesters registry with current version of DutyAndProof. - vc.attesters.mgetOrPut(item.data.pubkey, - default(EpochDuties)).duties[item.epoch] = item - - if vc.config.distributedEnabled: - let selections = daps.getSelections() - if len(selections) == 0: - return len(addOrReplaceItems) - - let sresponse = - try: - # Query middleware for aggregated signatures. - await vc.submitBeaconCommitteeSelections(selections, - ApiStrategyKind.Best) - except ValidatorApiError as exc: - warn "Unable to submit beacon committee selections", epoch = epoch, - reason = exc.getFailureReason() - return 0 - except CancelledError as exc: - debug "Beacon committee selections processing was interrupted" - raise exc - except CatchableError as exc: - error "Unexpected error occured while trying to submit beacon " & - "committee selections", epoch = epoch, err_name = exc.name, - err_msg = exc.msg - return 0 - - for selection in sresponse.data: - let selectionProof = selection.selection_proof.load().valueOr: - warn "Invalid signature encountered while processing beacon " & - "committee selections", - validator_index = ValidatorIndex(selection.validator_index), - slot = selection.slot, - selection_proof = shortLog(selection.selection_proof) - continue - - let dres = - block: - var res: Opt[DutyAndProof] - for dap in daps: - if (uint64(dap.data.validator_index) == - uint64(selection.validator_index)) and - (dap.data.slot == selection.slot): - var ndap = dap - ndap.slotSig = some(selection.selection_proof) - res = Opt.some(ndap) - break - res - - if dres.isSome(): - # Update VC attesters registry with new aggregated DutyAndProof. - let dap = dres.get() - vc.attesters.mgetOrPut( - dap.data.pubkey, default(EpochDuties)).duties[dap.epoch] = dap + for item in addOrReplaceItems: + let dap = DutyAndProof.init(item.epoch, currentRoot.get(), item.duty, + Opt.none(ValidatorSig)) + vc.attesters.mgetOrPut(dap.data.pubkey, + default(EpochDuties)).duties[dap.epoch] = dap return len(addOrReplaceItems) @@ -312,6 +375,174 @@ proc pruneSyncCommitteeDuties*(service: DutiesServiceRef, slot: Slot) = newSyncCommitteeDuties[key] = currentPeriodDuties vc.syncCommitteeDuties = newSyncCommitteeDuties +proc fillSyncSlotSignatures*( + service: DutiesServiceRef, + epochPeriods: seq[Epoch] + ) {.async.} = + let + vc = service.client + genesisRoot = vc.beaconGenesis.genesis_validators_root + validatorDuties = + block: + var res: seq[SyncValidatorAndDuty] + for epoch in epochPeriods: + for duty in vc.syncDutiesForEpoch(epoch): + if len(duty.slotSigs) == 0: + let validator = vc.attachedValidators[].getValidator( + duty.data.pubkey).valueOr: + continue + res.add( + SyncValidatorAndDuty( + validator: validator, + duty: duty)) + res + (pendingRequests, pendingIds) = + block: + var + sres: seq[Future[SignatureResult]] + ires: seq[SyncSlotRequest] + for epoch in epochPeriods: + let fork = vc.forkAtEpoch(epoch) + for slot in epoch.slots(): + for item in validatorDuties: + for syncCommitteeIndex in + item.duty.data.validator_sync_committee_indices: + let subCommitteeIndex = getSubcommitteeIndex(syncCommitteeIndex) + sres.add( + item.validator.getSyncCommitteeSelectionProof( + fork, genesisRoot, slot, subCommitteeIndex)) + ires.add( + SyncSlotRequest( + validator: item.validator, + slot: slot, + validatorSyncCommitteeIndex: syncCommitteeIndex, + validatorSubCommitteeIndex: subCommitteeIndex)) + (sres, ires) + + try: + # TODO (cheatfate): Here we waiting for all signature requests, but we could + # perform waiting slot by slot, so all the tasks from early slot could + # start working without delays. + await allFutures(pendingRequests) + except CancelledError as exc: + var pending: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): pending.add(future.cancelAndWait()) + await allFutures(pending) + raise exc + + for index, fut in pendingRequests.pairs(): + let + pid = pendingIds[index] + epoch = pid.slot.epoch() + slotIndex = pid.slot - epoch.start_slot() + signature = + if fut.done(): + let sres = fut.read() + if sres.isErr(): + warn "Unable to create slot proof using remote signer", + reason = sres.error(), epoch = epoch, + slot = pid.slot, slot_index = slotIndex, + validator = shortLog(pid.validator) + Opt.none(ValidatorSig) + else: + Opt.some(sres.get()) + else: + Opt.none(ValidatorSig) + + vc.syncCommitteeDuties.withValue(pid.validator.pubkey, map): + map[].duties.withValue(epoch, sdap): + sdap[].slotSigs.withValue(pid.validatorSubCommitteeIndex, proofs): + proofs[][slotIndex] = signature + + if vc.config.distributedEnabled: + var indexToKey: Table[ValidatorIndex, Opt[ValidatorPubKey]] + + let selections = + block: + var sres: seq[RestSyncCommitteeSelection] + for epoch in epochPeriods: + for duty in vc.syncDutiesForEpoch(epoch): + let + validator = vc.attachedValidators[].getValidator( + duty.data.pubkey).valueOr: + continue + vindex = validator.index.valueOr: + continue + startSlot = duty.epoch.start_slot() + indexToKey[vindex] = Opt.some(validator.pubkey) + for subCommitteeIndex, proofs in duty.slotSigs.pairs(): + for slotIndex, selection_proof in proofs.pairs(): + if selection_proof.isNone(): continue + sres.add(RestSyncCommitteeSelection( + validator_index: RestValidatorIndex(vindex), + slot: startSlot + slotIndex, + subcommittee_index: uint64(subCommitteeIndex), + selection_proof: selection_proof.get() + )) + sres + + if len(selections) == 0: return + + let sresponse = + try: + # Query middleware for aggregated signatures. + await vc.submitSyncCommitteeSelections(selections, + ApiStrategyKind.Best) + except ValidatorApiError as exc: + warn "Unable to submit sync committee selections", + reason = exc.getFailureReason() + return + except CancelledError as exc: + debug "Beacon committee selections processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while trying to submit sync " & + "committee selections", reason = exc.msg, error = exc.name + return + + for selection in sresponse.data: + let + vindex = selection.validator_index.toValidatorIndex().valueOr: + warn "Invalid validator_index value encountered while processing " & + "sync committee selections", + validator_index = uint64(selection.validator_index), + reason = $error + continue + selectionProof = selection.selection_proof.load().valueOr: + warn "Invalid signature encountered while processing " & + "sync committee selections", + validator_index = vindex, + slot = selection.slot, + subcommittee_index = selection.subcommittee_index, + selection_proof = shortLog(selection.selection_proof) + continue + epoch = selection.slot.epoch() + slotIndex = selection.slot - epoch.start_slot() + # Position in our slot_proofs array + subCommitteeIndex = SyncSubcommitteeIndex(selection.subcommittee_index) + validator = + block: + # Selections operating using validator indices, so we should check + # if we have such validator index in our validator's pool and it + # still in place (not removed using keystore manager). + let key = indexToKey.getOrDefault(vindex) + if key.isNone(): + warn "Non-existing validator encountered while processing " & + "sync committee selections", + validator_index = vindex, + slot = selection.slot, + subcommittee_index = selection.subcommittee_index, + selection_proof = shortLog(selection.selection_proof) + continue + vc.attachedValidators[].getValidator(key.get()).valueOr: + continue + + vc.syncCommitteeDuties.withValue(validator.pubkey, map): + map[].duties.withValue(epoch, sdap): + sdap[].slotSigs.withValue(subCommitteeIndex, proofs): + proofs[][slotIndex] = Opt.some(selectionProof.toValidatorSig()) + proc pollForSyncCommitteeDuties*(service: DutiesServiceRef, epoch: Epoch): Future[int] {.async.} = let vc = service.client @@ -351,45 +582,30 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef, remainingItems -= arraySize let - relevantDuties = - block: - var res: seq[SyncCommitteeDuty] - for duty in filteredDuties: - for validatorSyncCommitteeIndex in - duty.validator_sync_committee_indices: - res.add(SyncCommitteeDuty( - pubkey: duty.pubkey, - validator_index: duty.validator_index, - validator_sync_committee_index: validatorSyncCommitteeIndex)) - res - + relevantSdaps = filteredDuties.mapIt(SyncDutyAndProof.init(epoch, it)) fork = vc.forkAtEpoch(epoch) - let addOrReplaceItems = - block: - var alreadyWarned = false - var res: seq[tuple[epoch: Epoch, duty: SyncCommitteeDuty]] - for duty in relevantDuties: - var dutyFound = false - - vc.syncCommitteeDuties.withValue(duty.pubkey, map): - map.duties.withValue(epoch, epochDuty): - if epochDuty[] != duty: - dutyFound = true - - if dutyFound and not alreadyWarned: - info "Sync committee duties re-organization", duty, epoch - alreadyWarned = true - - res.add((epoch, duty)) - res + addOrReplaceItems = + block: + var + alreadyWarned = false + res: seq[tuple[epoch: Epoch, duty: SyncDutyAndProof]] + for sdap in relevantSdaps: + var dutyFound = false + vc.syncCommitteeDuties.withValue(sdap.data.pubkey, map): + map.duties.withValue(epoch, epochDuty): + if epochDuty[] != sdap: + dutyFound = true + if dutyFound and not(alreadyWarned): + info "Sync committee duties re-organization", sdap, epoch + alreadyWarned = true + res.add((epoch, sdap)) + res if len(addOrReplaceItems) > 0: - for epoch, duty in items(addOrReplaceItems): - var validatorDuties = - vc.syncCommitteeDuties.getOrDefault(duty.pubkey) - validatorDuties.duties[epoch] = duty - vc.syncCommitteeDuties[duty.pubkey] = validatorDuties + for epoch, sdap in items(addOrReplaceItems): + vc.syncCommitteeDuties.mgetOrPut(sdap.data.pubkey, + default(EpochSyncDuties)).duties[epoch] = sdap return len(addOrReplaceItems) @@ -431,6 +647,8 @@ proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} = if (counts[0].count == 0) and (counts[1].count == 0): debug "No new attester's duties received", slot = currentSlot + await service.fillAttestationSlotSignatures(@[currentEpoch, nextEpoch]) + let subscriptions = block: var res: seq[RestCommitteeSubscription] @@ -490,22 +708,26 @@ proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} = ) res - (counts, total) = + (counts, epochs, total) = block: var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod, count: int]] + var periods: seq[Epoch] var total = 0 if len(dutyPeriods) > 0: for (epoch, period) in dutyPeriods: let count = await service.pollForSyncCommitteeDuties(epoch) res.add((epoch: epoch, period: period, count: count)) + periods.add(epoch) total += count - (res, total) + (res, periods, total) if total == 0: debug "No new sync committee member's duties received", slot = currentSlot + await service.fillSyncSlotSignatures(epochs) + let subscriptions = block: var res: seq[RestSyncCommitteeSubscription] @@ -668,8 +890,13 @@ proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} = ) doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: - await service.pollForAttesterDuties() await service.waitForNextSlot(AttesterLoop) + # Cleaning up previous attestation duties task. + if not(isNil(service.pollingAttesterDutiesTask)) and + not(service.pollingAttesterDutiesTask.finished()): + await cancelAndWait(service.pollingAttesterDutiesTask) + # Spawning new attestation duties task. + service.pollingAttesterDutiesTask = service.pollForAttesterDuties() proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client @@ -727,8 +954,13 @@ proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} = ) doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") while true: - await service.pollForSyncCommitteeDuties() await service.waitForNextSlot(SyncCommitteeLoop) + # Cleaning up previous attestation duties task. + if not(isNil(service.pollingSyncDutiesTask)) and + not(service.pollingSyncDutiesTask.finished()): + await cancelAndWait(service.pollingSyncDutiesTask) + # Spawning new attestation duties task. + service.pollingSyncDutiesTask = service.pollForAttesterDuties() template checkAndRestart(serviceLoop: DutiesServiceLoop, future: Future[void], body: untyped): untyped = @@ -803,6 +1035,12 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = pending.add(prepareFut.cancelAndWait()) if not(isNil(registerFut)) and not(registerFut.finished()): pending.add(registerFut.cancelAndWait()) + if not(isNil(service.pollingAttesterDutiesTask)) and + not(service.pollingAttesterDutiesTask.finished()): + pending.add(service.pollingAttesterDutiesTask.cancelAndWait()) + if not(isNil(service.pollingSyncDutiesTask)) and + not(service.pollingSyncDutiesTask.finished()): + pending.add(service.pollingSyncDutiesTask.cancelAndWait()) await allFutures(pending) true except CatchableError as exc: diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index c51fc31f4..93e642375 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -25,19 +25,27 @@ type validator: AttachedValidator subcommitteeIdx: SyncSubcommitteeIndex + ValidatorAndSig* = object + validator: AttachedValidator + signature: ValidatorSig + proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef, slot: Slot, beaconBlockRoot: Eth2Digest, - duty: SyncCommitteeDuty): Future[bool] {. + duty: SyncCommitteeDuty, + index: IndexInSyncCommittee): Future[bool] {. async.} = let vc = service.client fork = vc.forkAtEpoch(slot.epoch) genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root - vindex = duty.validator_index - subcommitteeIdx = getSubcommitteeIndex( - duty.validator_sync_committee_index) validator = vc.getValidatorForDuties( duty.pubkey, slot, slashingSafe = true).valueOr: return false + subcommitteeIdx = getSubcommitteeIndex(index) + + logScope: + validator = shortLog(validator) + + let message = block: let res = await getSyncCommitteeMessage(validator, fork, @@ -50,8 +58,8 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef, return res.get() - debug "Sending sync committee message", message = shortLog(message), - validator = shortLog(validator), validator_index = vindex, + debug "Sending sync committee message", + message = shortLog(message), delay = vc.getDelay(message.slot.sync_committee_message_deadline()) let res = @@ -60,18 +68,14 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef, except ValidatorApiError as exc: warn "Unable to publish sync committee message", message = shortLog(message), - validator = shortLog(validator), - validator_index = vindex, reason = exc.getFailureReason() return false - except CancelledError: + except CancelledError as exc: debug "Publish sync committee message request was interrupted" - return false + raise exc except CatchableError as exc: error "Unexpected error occurred while publishing sync committee message", message = shortLog(message), - validator = shortLog(validator), - validator_index = vindex, err_name = exc.name, err_msg = exc.msg return false @@ -81,32 +85,32 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef, beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds()) notice "Sync committee message published", message = shortLog(message), - validator = shortLog(validator), - validator_index = vindex, delay = delay else: warn "Sync committee message was not accepted by beacon node", message = shortLog(message), - validator = shortLog(validator), - validator_index = vindex, delay = delay + delay = delay return res proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef, slot: Slot, beaconBlockRoot: Eth2Digest, - duties: seq[SyncCommitteeDuty]) + duties: seq[SyncDutyAndProof]) {.async.} = let vc = service.client let pendingSyncCommitteeMessages = block: var res: seq[Future[bool]] - for duty in duties: + for sdap in duties: + let duty = sdap.data debug "Serving sync message duty", duty, epoch = slot.epoch() - res.add(service.serveSyncCommitteeMessage(slot, - beaconBlockRoot, - duty)) + for syncCommitteeIndex in duty.validator_sync_committee_indices: + res.add(service.serveSyncCommitteeMessage(slot, + beaconBlockRoot, + duty, + syncCommitteeIndex)) res let statistics = @@ -180,9 +184,9 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef, err_msg = exc.msg, reason = exc.getFailureReason() false - except CancelledError: + except CancelledError as exc: debug "Publish sync contribution request was interrupted" - return false + raise exc except CatchableError as err: error "Unexpected error occurred while publishing sync contribution", contribution = shortLog(proof.contribution), @@ -205,73 +209,61 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef, proc produceAndPublishContributions(service: SyncCommitteeServiceRef, slot: Slot, beaconBlockRoot: Eth2Digest, - duties: seq[SyncCommitteeDuty]) {.async.} = + duties: seq[SyncDutyAndProof]) {.async.} = let vc = service.client - epoch = slot.epoch + epoch = slot.epoch() fork = vc.forkAtEpoch(epoch) - var slotSignatureReqs: seq[Future[SignatureResult]] - var validators: seq[(AttachedValidator, SyncSubcommitteeIndex)] + var + contributionFuts: array[SYNC_COMMITTEE_SUBNET_COUNT, + Future[SyncCommitteeContribution]] - for duty in duties: - let - validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr: - continue - subCommitteeIdx = - getSubcommitteeIndex(duty.validator_sync_committee_index) - future = validator.getSyncCommitteeSelectionProof( - fork, - vc.beaconGenesis.genesis_validators_root, - slot, - subCommitteeIdx) + let validatorContributions = + block: + var res: seq[ContributionItem] + for sdap in duties: + let duty = sdap.data + for syncCommitteeIndex in duty.validator_sync_committee_indices: + let + subCommitteeIndex = getSubcommitteeIndex(syncCommitteeIndex) + slotIndex = slot - slot.epoch().start_slot() + signature = + block: + let signatures = sdap.slotSigs.getOrDefault(subCommitteeIndex) + if signatures[slotIndex].isNone(): + continue + signatures[slotIndex].get() + validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr: + continue + vindex = validator.index.valueOr: + continue + if is_sync_committee_aggregator(signature): + res.add(ContributionItem( + aggregator_index: uint64(vindex), + selection_proof: signature, + validator: validator, + subcommitteeIdx: subCommitteeIndex + )) + if isNil(contributionFuts[int(subCommitteeIndex)]): + contributionFuts[int(subCommitteeIndex)] = + vc.produceSyncCommitteeContribution( + slot, subCommitteeIndex, beaconBlockRoot, + ApiStrategyKind.Best) + res - slotSignatureReqs.add(future) - validators.add((validator, subCommitteeIdx)) + let pendingFutures = contributionFuts.filterIt(not(isNil(it))) try: - await allFutures(slotSignatureReqs) + await allFutures(pendingFutures) except CancelledError as exc: - var pendingCancel: seq[Future[void]] - for future in slotSignatureReqs: - if not(future.finished()): - pendingCancel.add(future.cancelAndWait()) - await allFutures(pendingCancel) + var pending: seq[Future[void]] + for fut in pendingFutures: + if not(fut.finished()): + pending.add(fut.cancelAndWait()) + await allFutures(pending) raise exc - var - contributionsFuts: array[SYNC_COMMITTEE_SUBNET_COUNT, - Future[SyncCommitteeContribution]] - - let validatorContributions = block: - var res: seq[ContributionItem] - for idx, fut in slotSignatureReqs: - if fut.completed: - let - sigRes = fut.read - validator = validators[idx][0] - subCommitteeIdx = validators[idx][1] - if sigRes.isErr(): - warn "Unable to create slot signature using remote signer", - validator = shortLog(validator), - error_msg = sigRes.error() - elif validator.index.isSome and - is_sync_committee_aggregator(sigRes.get): - res.add ContributionItem( - aggregator_index: uint64(validator.index.get), - selection_proof: sigRes.get, - validator: validator, - subcommitteeIdx: subCommitteeIdx) - - if isNil(contributionsFuts[subCommitteeIdx]): - contributionsFuts[int subCommitteeIdx] = - vc.produceSyncCommitteeContribution( - slot, - subCommitteeIdx, - beaconBlockRoot, - ApiStrategyKind.Best) - res - if len(validatorContributions) > 0: let pendingAggregates = block: @@ -279,22 +271,21 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, for item in validatorContributions: let aggContribution = try: - await contributionsFuts[item.subcommitteeIdx] + contributionFuts[item.subcommitteeIdx].read() except ValidatorApiError as exc: warn "Unable to get sync message contribution data", slot = slot, beaconBlockRoot = shortLog(beaconBlockRoot), reason = exc.getFailureReason() - return - except CancelledError: + continue + except CancelledError as exc: debug "Request for sync message contribution was interrupted" - return + raise exc except CatchableError as exc: error "Unexpected error occurred while getting sync message "& "contribution", slot = slot, beaconBlockRoot = shortLog(beaconBlockRoot), err_name = exc.name, err_msg = exc.msg - return - + continue let proof = ContributionAndProof( aggregator_index: item.aggregator_index, contribution: aggContribution, @@ -308,12 +299,12 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, var errored, succeed, failed = 0 try: await allFutures(pendingAggregates) - except CancelledError as err: + except CancelledError as exc: for fut in pendingAggregates: if not(fut.finished()): fut.cancel() await allFutures(pendingAggregates) - raise err + raise exc for future in pendingAggregates: if future.completed(): @@ -336,7 +327,7 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef, proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, slot: Slot, - duties: seq[SyncCommitteeDuty]) {. + duties: seq[SyncDutyAndProof]) {. async.} = let vc = service.client @@ -367,9 +358,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, warn "Unable to retrieve head block's root to sign", reason = exc.msg, reason = exc.getFailureReason() return - except CancelledError: + except CancelledError as exc: debug "Block root request was interrupted" - return + raise exc except CatchableError as exc: error "Unexpected error while requesting sync message block root", err_name = exc.name, err_msg = exc.msg, slot = slot @@ -383,9 +374,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, warn "Unable to proceed sync committee messages", slot = slot, duties_count = len(duties), reason = exc.getFailureReason() return - except CancelledError: + except CancelledError as exc: debug "Sync committee producing process was interrupted" - return + raise exc except CatchableError as exc: error "Unexpected error while producing sync committee messages", slot = slot,